diff --git a/azure-source-connector/build.gradle.kts b/azure-source-connector/build.gradle.kts index 06487a525..6b6b44a49 100644 --- a/azure-source-connector/build.gradle.kts +++ b/azure-source-connector/build.gradle.kts @@ -59,7 +59,7 @@ dependencies { compileOnly("org.apache.velocity:velocity-engine-core:2.4.1") compileOnly("org.apache.velocity.tools:velocity-tools-generic:3.1") - implementation("commons-io:commons-io:2.19.0") + implementation("commons-io:commons-io:2.18.0") implementation("org.apache.commons:commons-lang3:3.17.0") implementation(project(":commons")) implementation(apache.commons.collection4) diff --git a/azure-source-connector/src/integration-test/java/aiven/kafka/connect/azure/source/AbstractIntegrationTest.java b/azure-source-connector/src/integration-test/java/aiven/kafka/connect/azure/source/AbstractIntegrationTest.java index 9895489df..405ed13f2 100644 --- a/azure-source-connector/src/integration-test/java/aiven/kafka/connect/azure/source/AbstractIntegrationTest.java +++ b/azure-source-connector/src/integration-test/java/aiven/kafka/connect/azure/source/AbstractIntegrationTest.java @@ -26,44 +26,42 @@ import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; -import java.util.Properties; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.NewTopic; +import io.aiven.commons.kafka.testkit.KafkaIntegrationTestBase; +import io.aiven.commons.kafka.testkit.KafkaManager; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import io.aiven.kafka.connect.azure.source.AzureBlobSourceConnector; import io.aiven.kafka.connect.common.config.CompressionType; import com.azure.storage.blob.BlobContainerAsyncClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClientBuilder; -import com.github.dockerjava.api.model.Ulimit; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.testcontainers.containers.FixedHostPortGenericContainer; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.containers.Network; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @SuppressWarnings({ "deprecation", "PMD.TestClassWithoutTestCases" }) @Testcontainers -class AbstractIntegrationTest { +class AbstractIntegrationTest extends KafkaIntegrationTestBase { protected final String testTopic0; protected final String testTopic1; - private AdminClient adminClient; - private ConnectRunner connectRunner; + private KafkaManager kafkaManager; + private KafkaProducer producer; protected static final int OFFSET_FLUSH_INTERVAL_MS = 5000; @@ -85,6 +83,7 @@ class AbstractIntegrationTest { private static final String AZURE_ENDPOINT = "http://127.0.0.1:10000"; private static final String ACCOUNT_NAME = "devstoreaccount1"; private static final String ACCOUNT_KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="; + private static final Set CONNECTOR_NAMES = new HashSet<>(); @Container private static final GenericContainer AZURITE_CONTAINER = new FixedHostPortGenericContainer<>( // NOPMD @@ -94,15 +93,9 @@ class AbstractIntegrationTest { .withFixedExposedPort(AZURE_TABLE_PORT, AZURE_TABLE_PORT) .withCommand("azurite --blobHost 0.0.0.0 --queueHost 0.0.0.0 --tableHost 0.0.0.0") .withReuse(true); - @Container - protected static final KafkaContainer KAFKA = new KafkaContainer("7.1.0") - .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false") - .withNetwork(Network.newNetwork()) - .withExposedPorts(KafkaContainer.KAFKA_PORT, 9092) - .withCreateContainerCmdModifier( - cmd -> cmd.getHostConfig().withUlimits(List.of(new Ulimit("nofile", 30_000L, 30_000L)))); protected AbstractIntegrationTest() { + super(); testTopic0 = "test-topic-0-" + UUID.randomUUID(); testTopic1 = "test-topic-1-" + UUID.randomUUID(); } @@ -144,13 +137,16 @@ static void setUpAll() throws IOException, InterruptedException { assert process.waitFor() == 0; } + @BeforeEach + void setupKafka() throws IOException { + kafkaManager = setupKafka(true, AzureBlobSourceConnector.class); + } + @AfterEach void tearDown() { - connectRunner.stop(); - adminClient.close(); producer.close(); testBlobAccessor.clear(azurePrefix); - connectRunner.awaitStop(); + CONNECTOR_NAMES.forEach(kafkaManager::deleteConnector); } protected BlobContainerAsyncClient getAsyncContainerClient() { @@ -197,27 +193,20 @@ protected Future sendMessageAsync(final String topicName, final return producer.send(msg); } - protected ConnectRunner getConnectRunner() { - return connectRunner; - } - protected void startConnectRunner(final Map testSpecificProducerProperties) throws ExecutionException, InterruptedException { testBlobAccessor.clear(azurePrefix); - final Properties adminClientConfig = new Properties(); - adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); - adminClient = AdminClient.create(adminClientConfig); + kafkaManager.createTopics(Arrays.asList(testTopic0, testTopic1)); final Map producerProps = new HashMap<>(testSpecificProducerProperties); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaManager.bootstrapServers()); producer = new KafkaProducer<>(producerProps); - final NewTopic newTopic0 = new NewTopic(testTopic0, 4, (short) 1); - final NewTopic newTopic1 = new NewTopic(testTopic1, 4, (short) 1); - adminClient.createTopics(Arrays.asList(newTopic0, newTopic1)).all().get(); + } - connectRunner = new ConnectRunner(pluginDir, KAFKA.getBootstrapServers(), OFFSET_FLUSH_INTERVAL_MS); - connectRunner.start(); + protected void createConnector(final Map connectorConfig) { + CONNECTOR_NAMES.add(connectorConfig.get("name")); + kafkaManager.configureConnector(connectorConfig.get("name"), connectorConfig); } } diff --git a/azure-source-connector/src/integration-test/java/aiven/kafka/connect/azure/source/AzureBlobClientIntegrationTest.java b/azure-source-connector/src/integration-test/java/aiven/kafka/connect/azure/source/AzureBlobClientIntegrationTest.java index 1c892d71c..d33bce2c1 100644 --- a/azure-source-connector/src/integration-test/java/aiven/kafka/connect/azure/source/AzureBlobClientIntegrationTest.java +++ b/azure-source-connector/src/integration-test/java/aiven/kafka/connect/azure/source/AzureBlobClientIntegrationTest.java @@ -47,7 +47,7 @@ final class AzureBlobClientIntegrationTest extends AbstractIntegrationTest producerProps = new HashMap<>(); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaManager().bootstrapServers()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, diff --git a/azure-source-connector/src/integration-test/java/aiven/kafka/connect/azure/source/ConnectRunner.java b/azure-source-connector/src/integration-test/java/aiven/kafka/connect/azure/source/ConnectRunner.java deleted file mode 100644 index 4cda76e85..000000000 --- a/azure-source-connector/src/integration-test/java/aiven/kafka/connect/azure/source/ConnectRunner.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2020 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package aiven.kafka.connect.azure.source; - -import java.io.File; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; - -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.connect.runtime.Connect; -import org.apache.kafka.connect.runtime.ConnectorConfig; -import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.Worker; -import org.apache.kafka.connect.runtime.isolation.Plugins; -import org.apache.kafka.connect.runtime.rest.RestServer; -import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; -import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; -import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; -import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; -import org.apache.kafka.connect.util.Callback; -import org.apache.kafka.connect.util.FutureCallback; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -final class ConnectRunner { - private static final Logger LOG = LoggerFactory.getLogger(ConnectRunner.class); - - private final File pluginDir; - private final String bootstrapServers; - private final int offsetFlushInterval; - - private Herder herder; - private Connect connect; - - public ConnectRunner(final File pluginDir, final String bootstrapServers, final int offsetFlushIntervalMs) { - this.pluginDir = pluginDir; - this.bootstrapServers = bootstrapServers; - this.offsetFlushInterval = offsetFlushIntervalMs; - } - - void start() { - final Map workerProps = new HashMap<>(); - workerProps.put("bootstrap.servers", bootstrapServers); - - workerProps.put("offset.flush.interval.ms", Integer.toString(offsetFlushInterval)); - - // These don't matter much (each connector sets its own converters), but need to be filled with valid classes. - workerProps.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); - workerProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); - workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.key.converter.schemas.enable", "false"); - workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); - workerProps.put("internal.value.converter.schemas.enable", "false"); - - // Don't need it since we'll memory MemoryOffsetBackingStore. - workerProps.put("offset.storage.file.filename", ""); - - workerProps.put("plugin.path", pluginDir.getPath()); - - final Time time = Time.SYSTEM; - final String workerId = "test-worker"; - final String kafkaClusterId = "test-cluster"; - - final Plugins plugins = new Plugins(workerProps); - final StandaloneConfig config = new StandaloneConfig(workerProps); - final Worker worker = new Worker(workerId, time, plugins, config, new MemoryOffsetBackingStore()); - herder = new StandaloneHerder(worker, kafkaClusterId); - - final RestServer rest = new RestServer(config); - - connect = new Connect(herder, rest); - connect.start(); - } - - void createConnector(final Map config) throws ExecutionException, InterruptedException { - assert herder != null; - - final FutureCallback> callback = new FutureCallback<>( - new Callback>() { - @Override - public void onCompletion(final Throwable error, final Herder.Created info) { - if (error == null) { - LOG.info("Created connector {}", info.result().name()); - } else { - LOG.error("Failed to create job"); - } - } - }); - herder.putConnectorConfig(config.get(ConnectorConfig.NAME_CONFIG), config, false, callback); - - final Herder.Created connectorInfoCreated = callback.get(); - assert connectorInfoCreated.created(); - } - - void stop() { - connect.stop(); - } - - void awaitStop() { - connect.awaitStop(); - } -}