Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion azure-source-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ dependencies {
compileOnly("org.apache.velocity:velocity-engine-core:2.4.1")
compileOnly("org.apache.velocity.tools:velocity-tools-generic:3.1")

implementation("commons-io:commons-io:2.19.0")
implementation("commons-io:commons-io:2.18.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this deliberate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat

implementation("org.apache.commons:commons-lang3:3.17.0")
implementation(project(":commons"))
implementation(apache.commons.collection4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,44 +26,42 @@
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import io.aiven.commons.kafka.testkit.KafkaIntegrationTestBase;
import io.aiven.commons.kafka.testkit.KafkaManager;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import io.aiven.kafka.connect.azure.source.AzureBlobSourceConnector;
import io.aiven.kafka.connect.common.config.CompressionType;

import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.github.dockerjava.api.model.Ulimit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@SuppressWarnings({ "deprecation", "PMD.TestClassWithoutTestCases" })
@Testcontainers
class AbstractIntegrationTest<K, V> {
class AbstractIntegrationTest<K, V> extends KafkaIntegrationTestBase {
protected final String testTopic0;
protected final String testTopic1;

private AdminClient adminClient;
private ConnectRunner connectRunner;
private KafkaManager kafkaManager;

private KafkaProducer<K, V> producer;

protected static final int OFFSET_FLUSH_INTERVAL_MS = 5000;
Expand All @@ -85,6 +83,7 @@ class AbstractIntegrationTest<K, V> {
private static final String AZURE_ENDPOINT = "http://127.0.0.1:10000";
private static final String ACCOUNT_NAME = "devstoreaccount1";
private static final String ACCOUNT_KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
private static final Set<String> CONNECTOR_NAMES = new HashSet<>();

@Container
private static final GenericContainer<?> AZURITE_CONTAINER = new FixedHostPortGenericContainer<>( // NOPMD
Expand All @@ -94,15 +93,9 @@ class AbstractIntegrationTest<K, V> {
.withFixedExposedPort(AZURE_TABLE_PORT, AZURE_TABLE_PORT)
.withCommand("azurite --blobHost 0.0.0.0 --queueHost 0.0.0.0 --tableHost 0.0.0.0")
.withReuse(true);
@Container
protected static final KafkaContainer KAFKA = new KafkaContainer("7.1.0")
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
.withNetwork(Network.newNetwork())
.withExposedPorts(KafkaContainer.KAFKA_PORT, 9092)
.withCreateContainerCmdModifier(
cmd -> cmd.getHostConfig().withUlimits(List.of(new Ulimit("nofile", 30_000L, 30_000L))));

protected AbstractIntegrationTest() {
super();
testTopic0 = "test-topic-0-" + UUID.randomUUID();
testTopic1 = "test-topic-1-" + UUID.randomUUID();
}
Expand Down Expand Up @@ -144,13 +137,16 @@ static void setUpAll() throws IOException, InterruptedException {
assert process.waitFor() == 0;
}

@BeforeEach
void setupKafka() throws IOException {
kafkaManager = setupKafka(true, AzureBlobSourceConnector.class);
}

@AfterEach
void tearDown() {
connectRunner.stop();
adminClient.close();
producer.close();
testBlobAccessor.clear(azurePrefix);
connectRunner.awaitStop();
CONNECTOR_NAMES.forEach(kafkaManager::deleteConnector);
}

protected BlobContainerAsyncClient getAsyncContainerClient() {
Expand Down Expand Up @@ -197,27 +193,20 @@ protected Future<RecordMetadata> sendMessageAsync(final String topicName, final
return producer.send(msg);
}

protected ConnectRunner getConnectRunner() {
return connectRunner;
}

protected void startConnectRunner(final Map<String, Object> testSpecificProducerProperties)
throws ExecutionException, InterruptedException {
testBlobAccessor.clear(azurePrefix);

final Properties adminClientConfig = new Properties();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
adminClient = AdminClient.create(adminClientConfig);
kafkaManager.createTopics(Arrays.asList(testTopic0, testTopic1));

final Map<String, Object> producerProps = new HashMap<>(testSpecificProducerProperties);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaManager.bootstrapServers());
producer = new KafkaProducer<>(producerProps);

final NewTopic newTopic0 = new NewTopic(testTopic0, 4, (short) 1);
final NewTopic newTopic1 = new NewTopic(testTopic1, 4, (short) 1);
adminClient.createTopics(Arrays.asList(newTopic0, newTopic1)).all().get();
}

connectRunner = new ConnectRunner(pluginDir, KAFKA.getBootstrapServers(), OFFSET_FLUSH_INTERVAL_MS);
connectRunner.start();
protected void createConnector(final Map<String, String> connectorConfig) {
CONNECTOR_NAMES.add(connectorConfig.get("name"));
kafkaManager.configureConnector(connectorConfig.get("name"), connectorConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ final class AzureBlobClientIntegrationTest extends AbstractIntegrationTest<byte[
void setUp() throws ExecutionException, InterruptedException {
testBlobAccessor.clear(azurePrefix);
final Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaManager().bootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Expand Down

This file was deleted.

Loading