Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public class BigQuerySinkTask extends SinkTask {
private boolean allowNewBigQueryFields;
private boolean useCredentialsProjectId;
private boolean allowRequiredFieldRelaxation;
private boolean allowSchemaUnionization;

/**
* Create a new BigquerySinkTask.
Expand Down Expand Up @@ -469,7 +470,6 @@ private SchemaManager newSchemaManager() {
Optional<Long> partitionExpiration = config.getPartitionExpirationMs();
Optional<List<String>> clusteringFieldName = config.getClusteringPartitionFieldNames();
Optional<TimePartitioning.Type> timePartitioningType = config.getTimePartitioningType();
boolean allowSchemaUnionization = config.getBoolean(BigQuerySinkConfig.ALLOW_SCHEMA_UNIONIZATION_CONFIG);
boolean sanitizeFieldNames = config.getBoolean(BigQuerySinkConfig.SANITIZE_FIELD_NAME_CONFIG);
return new SchemaManager(schemaRetriever, schemaConverter, getBigQuery(),
allowNewBigQueryFields, allowRequiredFieldRelaxation, allowSchemaUnionization,
Expand Down Expand Up @@ -513,15 +513,20 @@ private Storage getGcs() {

private GcsToBqWriter getGcsWriter() {
BigQuery bigQuery = getBigQuery();
// schemaManager shall only be needed for creating table hence do not fetch instance if not
// needed.
SchemaManager schemaManager = autoCreateTables ? getSchemaManager() : null;
boolean attemptSchemaUpdate = allowNewBigQueryFields
|| allowRequiredFieldRelaxation
|| allowSchemaUnionization;
// schemaManager shall only be needed for creating table or performing schema updates hence do
// not fetch instance if not needed.
boolean needsSchemaManager = autoCreateTables || attemptSchemaUpdate;
SchemaManager schemaManager = needsSchemaManager ? getSchemaManager() : null;
return new GcsToBqWriter(getGcs(),
bigQuery,
schemaManager,
retry,
retryWait,
autoCreateTables,
attemptSchemaUpdate,
time);
}

Expand Down Expand Up @@ -559,6 +564,7 @@ public void start(Map<String, String> properties) {
retryWait = config.getLong(BigQuerySinkConfig.BIGQUERY_RETRY_WAIT_CONFIG);
allowNewBigQueryFields = config.getBoolean(BigQuerySinkConfig.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG);
allowRequiredFieldRelaxation = config.getBoolean(BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG);
allowSchemaUnionization = config.getBoolean(BigQuerySinkConfig.ALLOW_SCHEMA_UNIONIZATION_CONFIG);
topicToPartitionTableId = new HashMap<>();
bigQuery = new AtomicReference<>();
schemaManager = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedMap;
Expand All @@ -63,6 +64,7 @@ public class GcsToBqWriter {
private final Storage storage;
private final BigQuery bigQuery;
private final SchemaManager schemaManager;
private final boolean attemptSchemaUpdate;
private final Time time;
private int retries;
private long retryWaitMs;
Expand All @@ -87,10 +89,12 @@ public GcsToBqWriter(Storage storage,
int retries,
long retryWaitMs,
boolean autoCreateTables,
boolean attemptSchemaUpdate,
Time time) {
this.storage = storage;
this.bigQuery = bigQuery;
this.schemaManager = schemaManager;
this.attemptSchemaUpdate = attemptSchemaUpdate;
this.time = time;

this.retries = retries;
Expand Down Expand Up @@ -130,6 +134,8 @@ public void writeRows(SortedMap<SinkRecord, RowToInsert> rows,
timeout = Duration.ofMillis(retryWaitMs);
}

List<SinkRecord> sinkRecords = new ArrayList<>(rows.keySet());

// Check if the table specified exists
// This error shouldn't be thrown. All tables should be created by the connector at startup
Table table = executeWithRetry(() -> bigQuery.getTable(tableId), timeout);
Expand All @@ -139,7 +145,7 @@ public void writeRows(SortedMap<SinkRecord, RowToInsert> rows,
logger.info("Table {} was not found. Creating the table automatically.", tableId);
Boolean created =
executeWithRetry(
() -> schemaManager.createTable(tableId, new ArrayList<>(rows.keySet())), timeout);
() -> schemaManager.createTable(tableId, sinkRecords), timeout);
if (created == null || !created) {
throw new BigQueryConnectException("Failed to create table " + tableId);
}
Expand All @@ -151,6 +157,22 @@ public void writeRows(SortedMap<SinkRecord, RowToInsert> rows,
throw new BigQueryConnectException("Failed to lookup table " + tableId);
}

if (attemptSchemaUpdate && schemaManager != null && !sinkRecords.isEmpty()) {
Boolean schemaUpdated =
executeWithRetry(
() -> {
schemaManager.updateSchema(tableId, sinkRecords);
return Boolean.TRUE;
},
timeout
);
if (schemaUpdated == null) {
throw new ConnectException(
String.format("Failed to update schema for table %s within %d re-attempts.", tableId, retries)
);
}
}

// --- Upload rows to GCS with executeWithRetry (fresh budget for uploads) ---
Duration uploadTimeout = Duration.ofMillis(Math.max(0L, retryWaitMs * Math.max(1, retries)));
if (retries == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/*
* Copyright 2024 Copyright 2022 Aiven Oy and
* bigquery-connector-for-apache-kafka project contributors
*
* This software contains code derived from the Confluent BigQuery
* Kafka Connector, Copyright Confluent, Inc, which in turn
* contains code derived from the WePay BigQuery Kafka Connector,
* Copyright WePay, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.wepay.kafka.connect.bigquery.integration;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file needs the standard header for the build to complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it 👍


import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
import com.wepay.kafka.connect.bigquery.integration.utils.BucketClearer;
import com.wepay.kafka.connect.bigquery.integration.utils.SchemaRegistryTestUtils;
import com.wepay.kafka.connect.bigquery.integration.utils.TableClearer;
import com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever;
import io.confluent.connect.avro.AvroConverter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.storage.Converter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("integration")
public class GcsBatchSchemaEvolutionIT extends BaseConnectorIT {

private static final String CONNECTOR_NAME = "gcs-schema-evolution-connector";
private static final int TASKS_MAX = 1;
private static final Duration LOAD_TIMEOUT = Duration.ofMinutes(2);

private BigQuery bigQuery;
private SchemaRegistryTestUtils schemaRegistry;
private String schemaRegistryUrl;
private Converter keyConverter;
private Converter valueConverter;
private Schema keySchema;
private Schema valueSchemaV1;
private Schema valueSchemaV2;
private String topic;
private String table;
private String bucketName;

@BeforeEach
public void setup() throws Exception {
startConnect();
bigQuery = newBigQuery();

schemaRegistry = new SchemaRegistryTestUtils(connect.kafka().bootstrapServers());
schemaRegistry.start();
schemaRegistryUrl = schemaRegistry.schemaRegistryUrl();

initialiseSchemas();
initialiseConverters();

topic = suffixedTableOrTopic("gcs_schema_evolution");
table = suffixedAndSanitizedTable("gcs_schema_evolution");
bucketName = gcsBucket() + "-" + System.nanoTime();

connect.kafka().createTopic(topic);
TableClearer.clearTables(bigQuery, dataset(), table);
createInitialTable();
}

@AfterEach
public void tearDown() throws Exception {
try {
if (connect != null) {
connect.deleteConnector(CONNECTOR_NAME);
}
} finally {
if (schemaRegistry != null) {
schemaRegistry.stop();
}
if (bigQuery != null) {
TableClearer.clearTables(bigQuery, dataset(), table);
}
BucketClearer.clearBucket(keyFile(), project(), bucketName, gcsFolder(), keySource());
stopConnect();
}
}

@Test
public void testSchemaEvolutionAcrossBatchLoads() throws Exception {
connect.configureConnector(CONNECTOR_NAME, connectorProps());
waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);

schemaRegistry.produceRecordsWithKey(
keyConverter,
valueConverter,
Collections.singletonList(recordV1(1L, "snacks")),
topic
);

waitForCommittedRecords(CONNECTOR_NAME, topic, 1, TASKS_MAX);
waitForRowCount(1L);

schemaRegistry.produceRecordsWithKey(
keyConverter,
valueConverter,
Collections.singletonList(recordV2(2L, null, "john")),
topic
);

waitForCommittedRecords(CONNECTOR_NAME, topic, 2, TASKS_MAX);
waitForRowCount(2L);

Table destinationTable = bigQuery.getTable(dataset(), table);
com.google.cloud.bigquery.Schema destinationSchema = destinationTable.getDefinition().getSchema();
Field categoryField = destinationSchema.getFields().get("category");
assertNotNull(categoryField, "category field should exist after load");
assertEquals(Field.Mode.NULLABLE, categoryField.getMode());
Field usernameField = destinationSchema.getFields().get("username");
assertNotNull(usernameField, "username field should be created");
assertEquals(Field.Mode.NULLABLE, usernameField.getMode());

List<List<Object>> rows = readAllRows(bigQuery, table, "id");
assertEquals(Arrays.asList(1L, "snacks", null), rows.get(0));
assertEquals(Arrays.asList(2L, null, "john"), rows.get(1));
}

private Map<String, String> connectorProps() {
Map<String, String> props = baseConnectorProps(TASKS_MAX);
props.put(TOPICS_CONFIG, topic);
props.put(
KEY_CONVERTER_CLASS_CONFIG,
AvroConverter.class.getName()
);
props.put(
KEY_CONVERTER_CLASS_CONFIG + "." + SCHEMA_REGISTRY_URL_CONFIG,
schemaRegistryUrl
);
props.put(
VALUE_CONVERTER_CLASS_CONFIG,
AvroConverter.class.getName()
);
props.put(
VALUE_CONVERTER_CLASS_CONFIG + "." + SCHEMA_REGISTRY_URL_CONFIG,
schemaRegistryUrl
);
props.put(BigQuerySinkConfig.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG, "true");
props.put(BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG, "true");
props.put(BigQuerySinkConfig.ENABLE_BATCH_CONFIG, topic + "," + table);
props.put(BigQuerySinkConfig.BATCH_LOAD_INTERVAL_SEC_CONFIG, "5");
props.put(BigQuerySinkConfig.GCS_BUCKET_NAME_CONFIG, bucketName);
props.put(BigQuerySinkConfig.GCS_FOLDER_NAME_CONFIG, gcsFolder());
props.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, IdentitySchemaRetriever.class.getName());
props.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false");
return props;
}

private void initialiseSchemas() {
keySchema = SchemaBuilder.struct()
.name("com.wepay.kafka.connect.bigquery.integration.Key")
.field("id", Schema.INT64_SCHEMA)
.build();

valueSchemaV1 = SchemaBuilder.struct()
.name("com.wepay.kafka.connect.bigquery.integration.ValueV1")
.field("id", Schema.INT64_SCHEMA)
.field("category", Schema.STRING_SCHEMA)
.build();

valueSchemaV2 = SchemaBuilder.struct()
.name("com.wepay.kafka.connect.bigquery.integration.ValueV2")
.field("id", Schema.INT64_SCHEMA)
.field("category", SchemaBuilder.string().optional().build())
.field("username", SchemaBuilder.string().optional().build())
.build();
}

private void initialiseConverters() {
keyConverter = new AvroConverter();
valueConverter = new AvroConverter();
Map<String, Object> keyConfig = new HashMap<>();
keyConfig.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
keyConverter.configure(keyConfig, true);
Map<String, Object> valueConfig = new HashMap<>();
valueConfig.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
valueConverter.configure(valueConfig, false);
}

private List<SchemaAndValue> recordV1(long id, String category) {
Struct key = new Struct(keySchema)
.put("id", id);
Struct value = new Struct(valueSchemaV1)
.put("id", id)
.put("category", category);
List<SchemaAndValue> record = new ArrayList<>(2);
record.add(new SchemaAndValue(keySchema, key));
record.add(new SchemaAndValue(valueSchemaV1, value));
return record;
}

private List<SchemaAndValue> recordV2(long id, String category, String username) {
Struct key = new Struct(keySchema)
.put("id", id);
Struct value = new Struct(valueSchemaV2)
.put("id", id)
.put("category", category)
.put("username", username);
List<SchemaAndValue> record = new ArrayList<>(2);
record.add(new SchemaAndValue(keySchema, key));
record.add(new SchemaAndValue(valueSchemaV2, value));
return record;
}

private void createInitialTable() {
TableId tableId = TableId.of(dataset(), table);
com.google.cloud.bigquery.Schema schema = com.google.cloud.bigquery.Schema.of(
Field.newBuilder("id", LegacySQLTypeName.INTEGER).setMode(Field.Mode.REQUIRED).build(),
Field.newBuilder("category", LegacySQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build()
);
bigQuery.create(TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build());
}

private void waitForRowCount(long expected) throws InterruptedException {
waitForCondition(() -> {
try {
return countRows(bigQuery, table) >= expected;
} catch (Exception e) {
return false;
}
}, LOAD_TIMEOUT.toMillis(), "Timed out waiting for " + expected + " rows");
}
}
Loading
Loading