Skip to content

Commit 99a09f8

Browse files
committed
Migrate gcs sink (#531)
Requires #528 Migrates gcs sink integration tests. Should be able to execute `../gradlew integrationTest` from the gcs-sink directory. ** Warning ** Tests take 20 - 25 minutes to execute.
1 parent 5ea93d3 commit 99a09f8

File tree

9 files changed

+60
-323
lines changed

9 files changed

+60
-323
lines changed

gcs-sink-connector/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ dependencies {
139139

140140
testRuntimeOnly(logginglibs.slf4j.log4j12)
141141

142+
integrationTestImplementation(testFixtures(project(":commons")))
142143
integrationTestImplementation(testinglibs.wiremock)
143144
integrationTestImplementation(testcontainers.junit.jupiter)
144145
integrationTestImplementation(testcontainers.kafka) // this is not Kafka version

gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/AbstractIntegrationTest.java

Lines changed: 23 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,15 @@
2727
import java.time.format.DateTimeFormatter;
2828
import java.util.Arrays;
2929
import java.util.HashMap;
30-
import java.util.List;
30+
import java.util.HashSet;
3131
import java.util.Map;
32-
import java.util.Properties;
32+
import java.util.Set;
3333
import java.util.UUID;
3434
import java.util.concurrent.ExecutionException;
3535
import java.util.concurrent.Future;
3636

37-
import org.apache.kafka.clients.admin.AdminClient;
38-
import org.apache.kafka.clients.admin.AdminClientConfig;
39-
import org.apache.kafka.clients.admin.NewTopic;
37+
import io.aiven.commons.kafka.testkit.KafkaIntegrationTestBase;
38+
import io.aiven.commons.kafka.testkit.KafkaManager;
4039
import org.apache.kafka.clients.producer.KafkaProducer;
4140
import org.apache.kafka.clients.producer.ProducerConfig;
4241
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -45,28 +44,26 @@
4544
import io.aiven.kafka.connect.common.config.CompressionType;
4645
import io.aiven.kafka.connect.gcs.testutils.BucketAccessor;
4746

48-
import com.github.dockerjava.api.model.Ulimit;
4947
import com.google.cloud.NoCredentials;
5048
import com.google.cloud.storage.BucketInfo;
5149
import com.google.cloud.storage.Storage;
5250
import com.google.cloud.storage.StorageOptions;
5351
import org.junit.jupiter.api.AfterEach;
5452
import org.junit.jupiter.api.BeforeAll;
53+
import org.junit.jupiter.api.BeforeEach;
5554
import org.testcontainers.containers.FixedHostPortGenericContainer;
5655
import org.testcontainers.containers.GenericContainer;
57-
import org.testcontainers.containers.KafkaContainer;
58-
import org.testcontainers.containers.Network;
5956
import org.testcontainers.junit.jupiter.Container;
6057
import org.testcontainers.junit.jupiter.Testcontainers;
6158

6259
@SuppressWarnings({ "deprecation", "PMD.TestClassWithoutTestCases" })
6360
@Testcontainers
64-
class AbstractIntegrationTest<K, V> {
61+
class AbstractIntegrationTest<K, V> extends KafkaIntegrationTestBase {
6562
protected final String testTopic0;
6663
protected final String testTopic1;
6764

68-
private AdminClient adminClient;
69-
private ConnectRunner connectRunner;
65+
private KafkaManager kafkaManager;
66+
7067
private KafkaProducer<K, V> producer;
7168

7269
protected static final int OFFSET_FLUSH_INTERVAL_MS = 5000;
@@ -89,20 +86,14 @@ class AbstractIntegrationTest<K, V> {
8986
protected static String gcsEndpoint; // NOPMD mutable static state
9087

9188
private static final String FAKE_GCS_SERVER_VERSION = System.getProperty("fake-gcs-server-version", "latest");
89+
private static final Set<String> CONNECTOR_NAMES = new HashSet<>();
9290
@Container
9391
@SuppressWarnings("rawtypes")
9492
private static final GenericContainer<?> FAKE_GCS_CONTAINER = new FixedHostPortGenericContainer(
9593
String.format("fsouza/fake-gcs-server:%s", FAKE_GCS_SERVER_VERSION))
9694
.withFixedExposedPort(GCS_PORT, GCS_PORT)
9795
.withCommand("-port", Integer.toString(GCS_PORT), "-scheme", "http")
9896
.withReuse(true);
99-
@Container
100-
protected static final KafkaContainer KAFKA = new KafkaContainer("5.2.1")
101-
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
102-
.withNetwork(Network.newNetwork())
103-
.withExposedPorts(KafkaContainer.KAFKA_PORT, 9092)
104-
.withCreateContainerCmdModifier(
105-
cmd -> cmd.getHostConfig().withUlimits(List.of(new Ulimit("nofile", 30_000L, 30_000L))));
10697

10798
static int getRandomPort() {
10899
try (ServerSocket socket = new ServerSocket(0)) {
@@ -114,6 +105,7 @@ static int getRandomPort() {
114105
}
115106

116107
protected AbstractIntegrationTest() {
108+
super();
117109
testTopic0 = "test-topic-0-" + UUID.randomUUID();
118110
testTopic1 = "test-topic-1-" + UUID.randomUUID();
119111
}
@@ -162,13 +154,17 @@ static void setUpAll() throws IOException, InterruptedException {
162154
assert process.waitFor() == 0;
163155
}
164156

157+
@BeforeEach
158+
void setupKafka() throws IOException {
159+
kafkaManager = setupKafka(true, GcsSinkConnector.class);
160+
}
161+
165162
@AfterEach
166163
void tearDown() {
167-
connectRunner.stop();
168-
adminClient.close();
169164
producer.close();
170165
testBucketAccessor.clear(gcsPrefix);
171-
connectRunner.awaitStop();
166+
CONNECTOR_NAMES.forEach(kafkaManager::deleteConnector);
167+
CONNECTOR_NAMES.clear();
172168
}
173169

174170
protected static boolean useFakeGCS() {
@@ -206,27 +202,20 @@ protected Future<RecordMetadata> sendMessageAsync(final String topicName, final
206202
return producer.send(msg);
207203
}
208204

209-
protected ConnectRunner getConnectRunner() {
210-
return connectRunner;
211-
}
212-
213205
protected void startConnectRunner(final Map<String, Object> testSpecificProducerProperties)
214206
throws ExecutionException, InterruptedException {
215207
testBucketAccessor.clear(gcsPrefix);
216208

217-
final Properties adminClientConfig = new Properties();
218-
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
219-
adminClient = AdminClient.create(adminClientConfig);
209+
kafkaManager.createTopics(Arrays.asList(testTopic0, testTopic1));
220210

221211
final Map<String, Object> producerProps = new HashMap<>(testSpecificProducerProperties);
222-
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
212+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaManager.bootstrapServers());
223213
producer = new KafkaProducer<>(producerProps);
224214

225-
final NewTopic newTopic0 = new NewTopic(testTopic0, 4, (short) 1);
226-
final NewTopic newTopic1 = new NewTopic(testTopic1, 4, (short) 1);
227-
adminClient.createTopics(Arrays.asList(newTopic0, newTopic1)).all().get();
215+
}
228216

229-
connectRunner = new ConnectRunner(pluginDir, KAFKA.getBootstrapServers(), OFFSET_FLUSH_INTERVAL_MS);
230-
connectRunner.start();
217+
protected void createConnector(final Map<String, String> connectorConfig) {
218+
CONNECTOR_NAMES.add(connectorConfig.get("name"));
219+
kafkaManager.configureConnector(connectorConfig.get("name"), connectorConfig);
231220
}
232221
}

gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/AvroIntegrationTest.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,30 +50,26 @@
5050
import org.junit.jupiter.params.ParameterizedTest;
5151
import org.junit.jupiter.params.provider.Arguments;
5252
import org.junit.jupiter.params.provider.MethodSource;
53-
import org.testcontainers.junit.jupiter.Container;
5453
import org.testcontainers.junit.jupiter.Testcontainers;
5554

5655
@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
5756
@Testcontainers
5857
final class AvroIntegrationTest extends AbstractIntegrationTest<String, GenericRecord> {
5958
private static final String CONNECTOR_NAME = "aiven-gcs-sink-connector-avro";
6059

61-
@Container
62-
private final SchemaRegistryContainer schemaRegistry = new SchemaRegistryContainer(KAFKA);
63-
6460
private final Schema avroInputDataSchema = new Schema.Parser().parse(
6561
"{\"type\":\"record\",\"name\":\"input_data\"," + "\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}");
6662

6763
@BeforeEach
6864
void setUp() throws ExecutionException, InterruptedException {
6965
testBucketAccessor.clear(gcsPrefix);
7066
final Map<String, Object> producerProps = new HashMap<>();
71-
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
67+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaManager().bootstrapServers());
7268
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
7369
"io.confluent.kafka.serializers.KafkaAvroSerializer");
7470
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
7571
"io.confluent.kafka.serializers.KafkaAvroSerializer");
76-
producerProps.put("schema.registry.url", schemaRegistry.getSchemaRegistryUrl());
72+
producerProps.put("schema.registry.url", getKafkaManager().getSchemaRegistryUrl());
7773
startConnectRunner(producerProps);
7874
}
7975

@@ -101,7 +97,7 @@ void avroOutput() throws ExecutionException, InterruptedException, IOException {
10197
final Map<String, String> connectorConfig = basicConnectorConfig();
10298
connectorConfig.put("format.output.fields", "key,value");
10399
connectorConfig.put("format.output.type", "avro");
104-
getConnectRunner().createConnector(connectorConfig);
100+
createConnector(connectorConfig);
105101

106102
final int recordCountPerPartition = 10;
107103
produceRecords(recordCountPerPartition);
@@ -174,7 +170,7 @@ void avroOutputPlainValueWithoutEnvelope(final String avroCodec, final Compressi
174170
connectorConfig.put("format.output.type", "avro");
175171
connectorConfig.put("file.compression.type", compression.name);
176172
connectorConfig.put("avro.codec", avroCodec);
177-
getConnectRunner().createConnector(connectorConfig);
173+
createConnector(connectorConfig);
178174

179175
final int recordCountPerPartition = 10;
180176
produceRecords(recordCountPerPartition);
@@ -231,7 +227,7 @@ void schemaChanged() throws ExecutionException, InterruptedException, IOExceptio
231227
connectorConfig.put("format.output.fields", "value");
232228
connectorConfig.put("format.output.fields.value.encoding", "none");
233229
connectorConfig.put("format.output.type", "avro");
234-
getConnectRunner().createConnector(connectorConfig);
230+
createConnector(connectorConfig);
235231

236232
final Schema evolvedAvroInputDataSchema = new Schema.Parser()
237233
.parse("{\"type\":\"record\",\"name\":\"input_data\","
@@ -286,7 +282,7 @@ void jsonlOutput() throws ExecutionException, InterruptedException {
286282
connectorConfig.put("format.output.fields.value.encoding", "none");
287283
connectorConfig.put("file.compression.type", compression);
288284
connectorConfig.put("format.output.type", "jsonl");
289-
getConnectRunner().createConnector(connectorConfig);
285+
createConnector(connectorConfig);
290286

291287
final int recordCountPerPartition = 10;
292288
produceRecords(recordCountPerPartition);
@@ -323,9 +319,9 @@ private Map<String, String> basicConnectorConfig() {
323319
config.put("name", CONNECTOR_NAME);
324320
config.put("connector.class", GcsSinkConnector.class.getName());
325321
config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
326-
config.put("key.converter.schema.registry.url", schemaRegistry.getSchemaRegistryUrl());
322+
config.put("key.converter.schema.registry.url", getKafkaManager().getSchemaRegistryUrl());
327323
config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
328-
config.put("value.converter.schema.registry.url", schemaRegistry.getSchemaRegistryUrl());
324+
config.put("value.converter.schema.registry.url", getKafkaManager().getSchemaRegistryUrl());
329325
config.put("tasks.max", "1");
330326
if (gcsCredentialsPath != null) {
331327
config.put("gcs.credentials.path", gcsCredentialsPath);

gcs-sink-connector/src/integration-test/java/io/aiven/kafka/connect/gcs/AvroParquetIntegrationTest.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.Future;
3131
import java.util.stream.Collectors;
3232

33+
import io.aiven.kafka.connect.common.format.ParquetTestDataFixture;
3334
import org.apache.kafka.clients.producer.ProducerConfig;
3435
import org.apache.kafka.clients.producer.RecordMetadata;
3536

@@ -40,27 +41,23 @@
4041
import org.junit.jupiter.api.BeforeEach;
4142
import org.junit.jupiter.api.Test;
4243
import org.junit.jupiter.api.io.TempDir;
43-
import org.testcontainers.junit.jupiter.Container;
4444
import org.testcontainers.junit.jupiter.Testcontainers;
4545

4646
@Testcontainers
4747
final class AvroParquetIntegrationTest extends AbstractIntegrationTest<String, GenericRecord> {
4848

4949
private static final String CONNECTOR_NAME = "aiven-gcs-sink-connector-parquet";
5050

51-
@Container
52-
private final SchemaRegistryContainer schemaRegistry = new SchemaRegistryContainer(KAFKA);
53-
5451
@BeforeEach
5552
void setUp() throws ExecutionException, InterruptedException {
5653
testBucketAccessor.clear(gcsPrefix);
5754
final Map<String, Object> producerProps = new HashMap<>();
58-
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
55+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaManager().bootstrapServers());
5956
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
6057
"io.confluent.kafka.serializers.KafkaAvroSerializer");
6158
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
6259
"io.confluent.kafka.serializers.KafkaAvroSerializer");
63-
producerProps.put("schema.registry.url", schemaRegistry.getSchemaRegistryUrl());
60+
producerProps.put("schema.registry.url", getKafkaManager().getSchemaRegistryUrl());
6461
startConnectRunner(producerProps);
6562
}
6663

@@ -70,7 +67,7 @@ void allOutputFields(@TempDir final Path tmpDir) throws ExecutionException, Inte
7067
final Map<String, String> connectorConfig = basicConnectorConfig(compression);
7168
connectorConfig.put("format.output.fields", "key,value,offset,timestamp,headers");
7269
connectorConfig.put("format.output.fields.value.encoding", "none");
73-
getConnectRunner().createConnector(connectorConfig);
70+
createConnector(connectorConfig);
7471

7572
final Schema valueSchema = SchemaBuilder.record("value")
7673
.fields()
@@ -109,7 +106,7 @@ void allOutputFields(@TempDir final Path tmpDir) throws ExecutionException, Inte
109106

110107
final Map<String, List<GenericRecord>> blobContents = new HashMap<>();
111108
for (final String blobName : expectedBlobs) {
112-
final var records = ParquetUtils.readRecords(tmpDir.resolve(Paths.get(blobName)),
109+
final var records = ParquetTestDataFixture.readRecords(tmpDir.resolve(Paths.get(blobName)),
113110
testBucketAccessor.readBytes(blobName));
114111
blobContents.put(blobName, records);
115112
}
@@ -138,7 +135,7 @@ void valueComplexType(@TempDir final Path tmpDir) throws ExecutionException, Int
138135
final Map<String, String> connectorConfig = basicConnectorConfig(compression);
139136
connectorConfig.put("format.output.fields", "value");
140137
connectorConfig.put("format.output.fields.value.encoding", "none");
141-
getConnectRunner().createConnector(connectorConfig);
138+
createConnector(connectorConfig);
142139

143140
final Schema valueSchema = SchemaBuilder.record("value")
144141
.fields()
@@ -177,7 +174,7 @@ void valueComplexType(@TempDir final Path tmpDir) throws ExecutionException, Int
177174

178175
final Map<String, List<GenericRecord>> blobContents = new HashMap<>();
179176
for (final String blobName : expectedBlobs) {
180-
final var records = ParquetUtils.readRecords(tmpDir.resolve(Paths.get(blobName)),
177+
final var records = ParquetTestDataFixture.readRecords(tmpDir.resolve(Paths.get(blobName)),
181178
testBucketAccessor.readBytes(blobName));
182179
blobContents.put(blobName, records);
183180
}
@@ -202,7 +199,7 @@ void schemaChanged(@TempDir final Path tmpDir) throws ExecutionException, Interr
202199
final Map<String, String> connectorConfig = basicConnectorConfig(compression);
203200
connectorConfig.put("format.output.fields", "value");
204201
connectorConfig.put("format.output.fields.value.encoding", "none");
205-
getConnectRunner().createConnector(connectorConfig);
202+
createConnector(connectorConfig);
206203

207204
final Schema valueSchema = SchemaBuilder.record("value")
208205
.fields()
@@ -268,7 +265,7 @@ void schemaChanged(@TempDir final Path tmpDir) throws ExecutionException, Interr
268265

269266
final var blobContents = new ArrayList<String>();
270267
for (final String blobName : expectedBlobs) {
271-
final var records = ParquetUtils.readRecords(tmpDir.resolve(Paths.get(blobName)),
268+
final var records = ParquetTestDataFixture.readRecords(tmpDir.resolve(Paths.get(blobName)),
272269
testBucketAccessor.readBytes(blobName));
273270
blobContents.addAll(records.stream().map(r -> r.get("value").toString()).collect(Collectors.toList()));
274271
}
@@ -280,9 +277,9 @@ private Map<String, String> basicConnectorConfig(final String compression) {
280277
config.put("name", CONNECTOR_NAME);
281278
config.put("connector.class", GcsSinkConnector.class.getName());
282279
config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
283-
config.put("key.converter.schema.registry.url", schemaRegistry.getSchemaRegistryUrl());
280+
config.put("key.converter.schema.registry.url", getKafkaManager().getSchemaRegistryUrl());
284281
config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
285-
config.put("value.converter.schema.registry.url", schemaRegistry.getSchemaRegistryUrl());
282+
config.put("value.converter.schema.registry.url", getKafkaManager().getSchemaRegistryUrl());
286283
config.put("tasks.max", "1");
287284
if (gcsCredentialsPath != null) {
288285
config.put("gcs.credentials.path", gcsCredentialsPath);

0 commit comments

Comments
 (0)