3333import org .apache .kafka .clients .producer .ProducerConfig ;
3434import org .apache .kafka .clients .producer .RecordMetadata ;
3535
36+ import io .aiven .kafka .connect .common .format .ParquetTestDataFixture ;
37+
3638import org .apache .avro .Schema ;
3739import org .apache .avro .SchemaBuilder ;
3840import org .apache .avro .generic .GenericData ;
3941import org .apache .avro .generic .GenericRecord ;
4042import org .junit .jupiter .api .BeforeEach ;
4143import org .junit .jupiter .api .Test ;
4244import org .junit .jupiter .api .io .TempDir ;
43- import org .testcontainers .junit .jupiter .Container ;
4445import org .testcontainers .junit .jupiter .Testcontainers ;
4546
4647@ Testcontainers
4748final class AvroParquetIntegrationTest extends AbstractIntegrationTest <String , GenericRecord > {
4849
4950 private static final String CONNECTOR_NAME = "aiven-azure-sink-connector-parquet" ;
5051
51- @ Container
52- private final SchemaRegistryContainer schemaRegistry = new SchemaRegistryContainer (KAFKA );
53-
5452 @ BeforeEach
5553 void setUp () throws ExecutionException , InterruptedException {
5654 testBlobAccessor .clear (azurePrefix );
5755 final Map <String , Object > producerProps = new HashMap <>();
58- producerProps .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , KAFKA . getBootstrapServers ());
56+ producerProps .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , getKafkaManager (). bootstrapServers ());
5957 producerProps .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG ,
6058 "io.confluent.kafka.serializers.KafkaAvroSerializer" );
6159 producerProps .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG ,
6260 "io.confluent.kafka.serializers.KafkaAvroSerializer" );
63- producerProps .put ("schema.registry.url" , schemaRegistry .getSchemaRegistryUrl ());
61+ producerProps .put ("schema.registry.url" , getKafkaManager () .getSchemaRegistryUrl ());
6462 startConnectRunner (producerProps );
6563 }
6664
@@ -70,7 +68,7 @@ void allOutputFields(@TempDir final Path tmpDir) throws ExecutionException, Inte
7068 final Map <String , String > connectorConfig = basicConnectorConfig (compression );
7169 connectorConfig .put (AzureBlobSinkConfig .FORMAT_OUTPUT_FIELDS_CONFIG , "key,value,offset,timestamp,headers" );
7270 connectorConfig .put (AzureBlobSinkConfig .FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG , "none" );
73- getConnectRunner (). createConnector (connectorConfig );
71+ createConnector (connectorConfig );
7472
7573 final Schema valueSchema = SchemaBuilder .record ("value" )
7674 .fields ()
@@ -109,7 +107,7 @@ void allOutputFields(@TempDir final Path tmpDir) throws ExecutionException, Inte
109107
110108 final Map <String , List <GenericRecord >> blobContents = new HashMap <>();
111109 for (final String blobName : expectedBlobs ) {
112- final var records = ParquetUtils .readRecords (tmpDir .resolve (Paths .get (blobName )),
110+ final var records = ParquetTestDataFixture .readRecords (tmpDir .resolve (Paths .get (blobName )),
113111 testBlobAccessor .readBytes (blobName ));
114112 blobContents .put (blobName , records );
115113 }
@@ -138,7 +136,7 @@ void valueComplexType(@TempDir final Path tmpDir) throws ExecutionException, Int
138136 final Map <String , String > connectorConfig = basicConnectorConfig (compression );
139137 connectorConfig .put (AzureBlobSinkConfig .FORMAT_OUTPUT_FIELDS_CONFIG , "value" );
140138 connectorConfig .put (AzureBlobSinkConfig .FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG , "none" );
141- getConnectRunner (). createConnector (connectorConfig );
139+ createConnector (connectorConfig );
142140
143141 final Schema valueSchema = SchemaBuilder .record ("value" )
144142 .fields ()
@@ -177,7 +175,7 @@ void valueComplexType(@TempDir final Path tmpDir) throws ExecutionException, Int
177175
178176 final Map <String , List <GenericRecord >> blobContents = new HashMap <>();
179177 for (final String blobName : expectedBlobs ) {
180- final var records = ParquetUtils .readRecords (tmpDir .resolve (Paths .get (blobName )),
178+ final var records = ParquetTestDataFixture .readRecords (tmpDir .resolve (Paths .get (blobName )),
181179 testBlobAccessor .readBytes (blobName ));
182180 blobContents .put (blobName , records );
183181 }
@@ -202,7 +200,7 @@ void schemaChanged(@TempDir final Path tmpDir) throws ExecutionException, Interr
202200 final Map <String , String > connectorConfig = basicConnectorConfig (compression );
203201 connectorConfig .put (AzureBlobSinkConfig .FORMAT_OUTPUT_FIELDS_CONFIG , "value" );
204202 connectorConfig .put (AzureBlobSinkConfig .FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG , "none" );
205- getConnectRunner (). createConnector (connectorConfig );
203+ createConnector (connectorConfig );
206204
207205 final Schema valueSchema = SchemaBuilder .record ("value" )
208206 .fields ()
@@ -268,7 +266,7 @@ void schemaChanged(@TempDir final Path tmpDir) throws ExecutionException, Interr
268266
269267 final var blobContents = new ArrayList <String >();
270268 for (final String blobName : expectedBlobs ) {
271- final var records = ParquetUtils .readRecords (tmpDir .resolve (Paths .get (blobName )),
269+ final var records = ParquetTestDataFixture .readRecords (tmpDir .resolve (Paths .get (blobName )),
272270 testBlobAccessor .readBytes (blobName ));
273271 blobContents .addAll (records .stream ().map (r -> r .get ("value" ).toString ()).collect (Collectors .toList ()));
274272 }
@@ -280,9 +278,9 @@ private Map<String, String> basicConnectorConfig(final String compression) {
280278 config .put (AzureBlobSinkConfig .NAME_CONFIG , CONNECTOR_NAME );
281279 config .put ("connector.class" , AzureBlobSinkConnector .class .getName ());
282280 config .put ("key.converter" , "io.confluent.connect.avro.AvroConverter" );
283- config .put ("key.converter.schema.registry.url" , schemaRegistry .getSchemaRegistryUrl ());
281+ config .put ("key.converter.schema.registry.url" , getKafkaManager () .getSchemaRegistryUrl ());
284282 config .put ("value.converter" , "io.confluent.connect.avro.AvroConverter" );
285- config .put ("value.converter.schema.registry.url" , schemaRegistry .getSchemaRegistryUrl ());
283+ config .put ("value.converter.schema.registry.url" , getKafkaManager () .getSchemaRegistryUrl ());
286284 config .put ("tasks.max" , "1" );
287285 if (useFakeAzure ()) {
288286 config .put (AzureBlobSinkConfig .AZURE_STORAGE_CONNECTION_STRING_CONFIG , azureEndpoint );
0 commit comments