diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index ae285985..87306780 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -142,6 +142,7 @@ public class BigQuerySinkTask extends SinkTask { private boolean allowNewBigQueryFields; private boolean useCredentialsProjectId; private boolean allowRequiredFieldRelaxation; + private boolean allowSchemaUnionization; /** * Create a new BigquerySinkTask. @@ -469,7 +470,6 @@ private SchemaManager newSchemaManager() { Optional partitionExpiration = config.getPartitionExpirationMs(); Optional> clusteringFieldName = config.getClusteringPartitionFieldNames(); Optional timePartitioningType = config.getTimePartitioningType(); - boolean allowSchemaUnionization = config.getBoolean(BigQuerySinkConfig.ALLOW_SCHEMA_UNIONIZATION_CONFIG); boolean sanitizeFieldNames = config.getBoolean(BigQuerySinkConfig.SANITIZE_FIELD_NAME_CONFIG); return new SchemaManager(schemaRetriever, schemaConverter, getBigQuery(), allowNewBigQueryFields, allowRequiredFieldRelaxation, allowSchemaUnionization, @@ -513,15 +513,20 @@ private Storage getGcs() { private GcsToBqWriter getGcsWriter() { BigQuery bigQuery = getBigQuery(); - // schemaManager shall only be needed for creating table hence do not fetch instance if not - // needed. - SchemaManager schemaManager = autoCreateTables ? getSchemaManager() : null; + boolean attemptSchemaUpdate = allowNewBigQueryFields + || allowRequiredFieldRelaxation + || allowSchemaUnionization; + // schemaManager shall only be needed for creating table or performing schema updates hence do + // not fetch instance if not needed. + boolean needsSchemaManager = autoCreateTables || attemptSchemaUpdate; + SchemaManager schemaManager = needsSchemaManager ? getSchemaManager() : null; return new GcsToBqWriter(getGcs(), bigQuery, schemaManager, retry, retryWait, autoCreateTables, + attemptSchemaUpdate, time); } @@ -559,6 +564,7 @@ public void start(Map properties) { retryWait = config.getLong(BigQuerySinkConfig.BIGQUERY_RETRY_WAIT_CONFIG); allowNewBigQueryFields = config.getBoolean(BigQuerySinkConfig.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG); allowRequiredFieldRelaxation = config.getBoolean(BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG); + allowSchemaUnionization = config.getBoolean(BigQuerySinkConfig.ALLOW_SCHEMA_UNIONIZATION_CONFIG); topicToPartitionTableId = new HashMap<>(); bigQuery = new AtomicReference<>(); schemaManager = new AtomicReference<>(); diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriter.java index 08e538ae..ced559b9 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriter.java @@ -41,6 +41,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.SortedMap; @@ -63,6 +64,7 @@ public class GcsToBqWriter { private final Storage storage; private final BigQuery bigQuery; private final SchemaManager schemaManager; + private final boolean attemptSchemaUpdate; private final Time time; private int retries; private long retryWaitMs; @@ -87,10 +89,12 @@ public GcsToBqWriter(Storage storage, int retries, long retryWaitMs, boolean autoCreateTables, + boolean attemptSchemaUpdate, Time time) { this.storage = storage; this.bigQuery = bigQuery; this.schemaManager = schemaManager; + this.attemptSchemaUpdate = attemptSchemaUpdate; this.time = time; this.retries = retries; @@ -130,6 +134,8 @@ public void writeRows(SortedMap rows, timeout = Duration.ofMillis(retryWaitMs); } + List sinkRecords = new ArrayList<>(rows.keySet()); + // Check if the table specified exists // This error shouldn't be thrown. All tables should be created by the connector at startup Table table = executeWithRetry(() -> bigQuery.getTable(tableId), timeout); @@ -139,7 +145,7 @@ public void writeRows(SortedMap rows, logger.info("Table {} was not found. Creating the table automatically.", tableId); Boolean created = executeWithRetry( - () -> schemaManager.createTable(tableId, new ArrayList<>(rows.keySet())), timeout); + () -> schemaManager.createTable(tableId, sinkRecords), timeout); if (created == null || !created) { throw new BigQueryConnectException("Failed to create table " + tableId); } @@ -151,6 +157,22 @@ public void writeRows(SortedMap rows, throw new BigQueryConnectException("Failed to lookup table " + tableId); } + if (attemptSchemaUpdate && schemaManager != null && !sinkRecords.isEmpty()) { + Boolean schemaUpdated = + executeWithRetry( + () -> { + schemaManager.updateSchema(tableId, sinkRecords); + return Boolean.TRUE; + }, + timeout + ); + if (schemaUpdated == null) { + throw new ConnectException( + String.format("Failed to update schema for table %s within %d re-attempts.", tableId, retries) + ); + } + } + // --- Upload rows to GCS with executeWithRetry (fresh budget for uploads) --- Duration uploadTimeout = Duration.ofMillis(Math.max(0L, retryWaitMs * Math.max(1, retries))); if (retries == 0) { diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/GcsBatchSchemaEvolutionIT.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/GcsBatchSchemaEvolutionIT.java new file mode 100644 index 00000000..05c248e8 --- /dev/null +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/GcsBatchSchemaEvolutionIT.java @@ -0,0 +1,265 @@ +/* + * Copyright 2024 Copyright 2022 Aiven Oy and + * bigquery-connector-for-apache-kafka project contributors + * + * This software contains code derived from the Confluent BigQuery + * Kafka Connector, Copyright Confluent, Inc, which in turn + * contains code derived from the WePay BigQuery Kafka Connector, + * Copyright WePay, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.wepay.kafka.connect.bigquery.integration; + +import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; +import com.wepay.kafka.connect.bigquery.integration.utils.BucketClearer; +import com.wepay.kafka.connect.bigquery.integration.utils.SchemaRegistryTestUtils; +import com.wepay.kafka.connect.bigquery.integration.utils.TableClearer; +import com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever; +import io.confluent.connect.avro.AvroConverter; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.storage.Converter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Tag("integration") +public class GcsBatchSchemaEvolutionIT extends BaseConnectorIT { + + private static final String CONNECTOR_NAME = "gcs-schema-evolution-connector"; + private static final int TASKS_MAX = 1; + private static final Duration LOAD_TIMEOUT = Duration.ofMinutes(2); + + private BigQuery bigQuery; + private SchemaRegistryTestUtils schemaRegistry; + private String schemaRegistryUrl; + private Converter keyConverter; + private Converter valueConverter; + private Schema keySchema; + private Schema valueSchemaV1; + private Schema valueSchemaV2; + private String topic; + private String table; + private String bucketName; + + @BeforeEach + public void setup() throws Exception { + startConnect(); + bigQuery = newBigQuery(); + + schemaRegistry = new SchemaRegistryTestUtils(connect.kafka().bootstrapServers()); + schemaRegistry.start(); + schemaRegistryUrl = schemaRegistry.schemaRegistryUrl(); + + initialiseSchemas(); + initialiseConverters(); + + topic = suffixedTableOrTopic("gcs_schema_evolution"); + table = suffixedAndSanitizedTable("gcs_schema_evolution"); + bucketName = gcsBucket() + "-" + System.nanoTime(); + + connect.kafka().createTopic(topic); + TableClearer.clearTables(bigQuery, dataset(), table); + createInitialTable(); + } + + @AfterEach + public void tearDown() throws Exception { + try { + if (connect != null) { + connect.deleteConnector(CONNECTOR_NAME); + } + } finally { + if (schemaRegistry != null) { + schemaRegistry.stop(); + } + if (bigQuery != null) { + TableClearer.clearTables(bigQuery, dataset(), table); + } + BucketClearer.clearBucket(keyFile(), project(), bucketName, gcsFolder(), keySource()); + stopConnect(); + } + } + + @Test + public void testSchemaEvolutionAcrossBatchLoads() throws Exception { + connect.configureConnector(CONNECTOR_NAME, connectorProps()); + waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX); + + schemaRegistry.produceRecordsWithKey( + keyConverter, + valueConverter, + Collections.singletonList(recordV1(1L, "snacks")), + topic + ); + + waitForCommittedRecords(CONNECTOR_NAME, topic, 1, TASKS_MAX); + waitForRowCount(1L); + + schemaRegistry.produceRecordsWithKey( + keyConverter, + valueConverter, + Collections.singletonList(recordV2(2L, null, "john")), + topic + ); + + waitForCommittedRecords(CONNECTOR_NAME, topic, 2, TASKS_MAX); + waitForRowCount(2L); + + Table destinationTable = bigQuery.getTable(dataset(), table); + com.google.cloud.bigquery.Schema destinationSchema = destinationTable.getDefinition().getSchema(); + Field categoryField = destinationSchema.getFields().get("category"); + assertNotNull(categoryField, "category field should exist after load"); + assertEquals(Field.Mode.NULLABLE, categoryField.getMode()); + Field usernameField = destinationSchema.getFields().get("username"); + assertNotNull(usernameField, "username field should be created"); + assertEquals(Field.Mode.NULLABLE, usernameField.getMode()); + + List> rows = readAllRows(bigQuery, table, "id"); + assertEquals(Arrays.asList(1L, "snacks", null), rows.get(0)); + assertEquals(Arrays.asList(2L, null, "john"), rows.get(1)); + } + + private Map connectorProps() { + Map props = baseConnectorProps(TASKS_MAX); + props.put(TOPICS_CONFIG, topic); + props.put( + KEY_CONVERTER_CLASS_CONFIG, + AvroConverter.class.getName() + ); + props.put( + KEY_CONVERTER_CLASS_CONFIG + "." + SCHEMA_REGISTRY_URL_CONFIG, + schemaRegistryUrl + ); + props.put( + VALUE_CONVERTER_CLASS_CONFIG, + AvroConverter.class.getName() + ); + props.put( + VALUE_CONVERTER_CLASS_CONFIG + "." + SCHEMA_REGISTRY_URL_CONFIG, + schemaRegistryUrl + ); + props.put(BigQuerySinkConfig.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG, "true"); + props.put(BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG, "true"); + props.put(BigQuerySinkConfig.ENABLE_BATCH_CONFIG, topic + "," + table); + props.put(BigQuerySinkConfig.BATCH_LOAD_INTERVAL_SEC_CONFIG, "5"); + props.put(BigQuerySinkConfig.GCS_BUCKET_NAME_CONFIG, bucketName); + props.put(BigQuerySinkConfig.GCS_FOLDER_NAME_CONFIG, gcsFolder()); + props.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, IdentitySchemaRetriever.class.getName()); + props.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false"); + return props; + } + + private void initialiseSchemas() { + keySchema = SchemaBuilder.struct() + .name("com.wepay.kafka.connect.bigquery.integration.Key") + .field("id", Schema.INT64_SCHEMA) + .build(); + + valueSchemaV1 = SchemaBuilder.struct() + .name("com.wepay.kafka.connect.bigquery.integration.ValueV1") + .field("id", Schema.INT64_SCHEMA) + .field("category", Schema.STRING_SCHEMA) + .build(); + + valueSchemaV2 = SchemaBuilder.struct() + .name("com.wepay.kafka.connect.bigquery.integration.ValueV2") + .field("id", Schema.INT64_SCHEMA) + .field("category", SchemaBuilder.string().optional().build()) + .field("username", SchemaBuilder.string().optional().build()) + .build(); + } + + private void initialiseConverters() { + keyConverter = new AvroConverter(); + valueConverter = new AvroConverter(); + Map keyConfig = new HashMap<>(); + keyConfig.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + keyConverter.configure(keyConfig, true); + Map valueConfig = new HashMap<>(); + valueConfig.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + valueConverter.configure(valueConfig, false); + } + + private List recordV1(long id, String category) { + Struct key = new Struct(keySchema) + .put("id", id); + Struct value = new Struct(valueSchemaV1) + .put("id", id) + .put("category", category); + List record = new ArrayList<>(2); + record.add(new SchemaAndValue(keySchema, key)); + record.add(new SchemaAndValue(valueSchemaV1, value)); + return record; + } + + private List recordV2(long id, String category, String username) { + Struct key = new Struct(keySchema) + .put("id", id); + Struct value = new Struct(valueSchemaV2) + .put("id", id) + .put("category", category) + .put("username", username); + List record = new ArrayList<>(2); + record.add(new SchemaAndValue(keySchema, key)); + record.add(new SchemaAndValue(valueSchemaV2, value)); + return record; + } + + private void createInitialTable() { + TableId tableId = TableId.of(dataset(), table); + com.google.cloud.bigquery.Schema schema = com.google.cloud.bigquery.Schema.of( + Field.newBuilder("id", LegacySQLTypeName.INTEGER).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("category", LegacySQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build() + ); + bigQuery.create(TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build()); + } + + private void waitForRowCount(long expected) throws InterruptedException { + waitForCondition(() -> { + try { + return countRows(bigQuery, table) >= expected; + } catch (Exception e) { + return false; + } + }, LOAD_TIMEOUT.toMillis(), "Timed out waiting for " + expected + " rows"); + } +} \ No newline at end of file diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriterTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriterTest.java index c4a34c77..415cf93f 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriterTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriterTest.java @@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -220,7 +221,7 @@ public void happyPathNoRetry() throws Exception { GcsToBqWriter writer = new GcsToBqWriter( - storage, bigQuery, schemaManager, retries, retryWaitMs, autoCreate, mockTime); + storage, bigQuery, schemaManager, retries, retryWaitMs, autoCreate, false, mockTime); long t0 = mockTime.milliseconds(); writer.writeRows(oneRow(), TableId.of("ds", "tbl"), "bucket", "blob"); @@ -228,11 +229,38 @@ public void happyPathNoRetry() throws Exception { // One lookup, one upload; no retries, no sleeps → elapsed should be 0 verify(bigQuery, times(1)).getTable(any(TableId.class)); + verify(schemaManager, never()).updateSchema(any(TableId.class), anyList()); verify(storage, times(1)).create(any(BlobInfo.class), any(byte[].class)); - verifyNoMoreInteractions(storage, bigQuery); + verifyNoMoreInteractions(storage, bigQuery, schemaManager); assertEquals(0L, elapsed, "no backoff should occur on the happy path"); } + @Test + public void schemaUpdatedWhenEnabled() throws Exception { + int retries = 1; + long retryWaitMs = 100; + boolean autoCreate = false; + + BigQuery bigQuery = mock(BigQuery.class); + when(bigQuery.getTable(any(TableId.class))).thenReturn(mock(Table.class)); + + Storage storage = mock(Storage.class); + when(storage.create(any(BlobInfo.class), any(byte[].class))).thenReturn(null); + + SchemaManager schemaManager = mock(SchemaManager.class); + Time mockTime = new MockTime(); + + GcsToBqWriter writer = + new GcsToBqWriter( + storage, bigQuery, schemaManager, retries, retryWaitMs, autoCreate, true, mockTime); + + writer.writeRows(oneRow(), TableId.of("ds", "tbl"), "bucket", "blob"); + + verify(schemaManager, times(1)).updateSchema(any(TableId.class), anyList()); + verify(storage, times(1)).create(any(BlobInfo.class), any(byte[].class)); + } + + @Test public void backoffIsCapped() throws Exception { int retries = 4; // allow 3 sleeps then success @@ -254,7 +282,7 @@ public void backoffIsCapped() throws Exception { GcsToBqWriter writer = new GcsToBqWriter( - storage, bigQuery, schemaManager, retries, retryWaitMs, autoCreate, mockTime); + storage, bigQuery, schemaManager, retries, retryWaitMs, autoCreate, true, mockTime); long t0 = mockTime.milliseconds(); writer.writeRows(oneRow(), TableId.of("ds", "tbl"), "bucket", "blob"); @@ -262,6 +290,7 @@ public void backoffIsCapped() throws Exception { long minExpected = 20_000; // Budget = retries(4) * retryWaitMs(5000) = 20s long maxExpected = minExpected + 3 * 1000; // + jitter bound + verify(schemaManager, times(1)).updateSchema(any(TableId.class), anyList()); verify(storage, times(4)).create(any(BlobInfo.class), any(byte[].class)); assertTrue(elapsed >= minExpected, "elapsed too small: " + elapsed); assertTrue(elapsed <= maxExpected, "elapsed too large: " + elapsed); @@ -297,7 +326,7 @@ public void budgetCutsBeforeAllRetries() throws Exception { GcsToBqWriter writer = new GcsToBqWriter( - storage, bigQuery, schemaManager, retries, retryWaitMs, autoCreate, mockTime); + storage, bigQuery, schemaManager, retries, retryWaitMs, autoCreate, true, mockTime); // Because lookup never succeeds and the time budget expires, writeRows should fail // with a BigQueryConnectException (null table interpreted as lookup failure). @@ -311,6 +340,8 @@ public void budgetCutsBeforeAllRetries() throws Exception { verify(bigQuery, atLeast(6)).getTable(any(TableId.class)); verify(bigQuery, atMost(10)).getTable(any(TableId.class)); + verify(schemaManager, never()).updateSchema(any(TableId.class), anyList()); + // No upload should be attempted since table resolution never succeeded. verify(storage, never()).create(any(BlobInfo.class), any(byte[].class)); }