Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public class BigQuerySinkTask extends SinkTask {
private boolean allowNewBigQueryFields;
private boolean useCredentialsProjectId;
private boolean allowRequiredFieldRelaxation;
private boolean allowSchemaUnionization;

/**
* Create a new BigquerySinkTask.
Expand Down Expand Up @@ -469,7 +470,6 @@ private SchemaManager newSchemaManager() {
Optional<Long> partitionExpiration = config.getPartitionExpirationMs();
Optional<List<String>> clusteringFieldName = config.getClusteringPartitionFieldNames();
Optional<TimePartitioning.Type> timePartitioningType = config.getTimePartitioningType();
boolean allowSchemaUnionization = config.getBoolean(BigQuerySinkConfig.ALLOW_SCHEMA_UNIONIZATION_CONFIG);
boolean sanitizeFieldNames = config.getBoolean(BigQuerySinkConfig.SANITIZE_FIELD_NAME_CONFIG);
return new SchemaManager(schemaRetriever, schemaConverter, getBigQuery(),
allowNewBigQueryFields, allowRequiredFieldRelaxation, allowSchemaUnionization,
Expand Down Expand Up @@ -513,15 +513,20 @@ private Storage getGcs() {

private GcsToBqWriter getGcsWriter() {
BigQuery bigQuery = getBigQuery();
// schemaManager shall only be needed for creating table hence do not fetch instance if not
// needed.
SchemaManager schemaManager = autoCreateTables ? getSchemaManager() : null;
boolean attemptSchemaUpdate = allowNewBigQueryFields
|| allowRequiredFieldRelaxation
|| allowSchemaUnionization;
// schemaManager shall only be needed for creating table or performing schema updates hence do
// not fetch instance if not needed.
boolean needsSchemaManager = autoCreateTables || attemptSchemaUpdate;
SchemaManager schemaManager = needsSchemaManager ? getSchemaManager() : null;
return new GcsToBqWriter(getGcs(),
bigQuery,
schemaManager,
retry,
retryWait,
autoCreateTables,
attemptSchemaUpdate,
time);
}

Expand Down Expand Up @@ -559,6 +564,7 @@ public void start(Map<String, String> properties) {
retryWait = config.getLong(BigQuerySinkConfig.BIGQUERY_RETRY_WAIT_CONFIG);
allowNewBigQueryFields = config.getBoolean(BigQuerySinkConfig.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG);
allowRequiredFieldRelaxation = config.getBoolean(BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG);
allowSchemaUnionization = config.getBoolean(BigQuerySinkConfig.ALLOW_SCHEMA_UNIONIZATION_CONFIG);
topicToPartitionTableId = new HashMap<>();
bigQuery = new AtomicReference<>();
schemaManager = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedMap;
Expand All @@ -63,6 +64,7 @@ public class GcsToBqWriter {
private final Storage storage;
private final BigQuery bigQuery;
private final SchemaManager schemaManager;
private final boolean attemptSchemaUpdate;
private final Time time;
private int retries;
private long retryWaitMs;
Expand All @@ -87,10 +89,12 @@ public GcsToBqWriter(Storage storage,
int retries,
long retryWaitMs,
boolean autoCreateTables,
boolean attemptSchemaUpdate,
Time time) {
this.storage = storage;
this.bigQuery = bigQuery;
this.schemaManager = schemaManager;
this.attemptSchemaUpdate = attemptSchemaUpdate;
this.time = time;

this.retries = retries;
Expand Down Expand Up @@ -130,6 +134,8 @@ public void writeRows(SortedMap<SinkRecord, RowToInsert> rows,
timeout = Duration.ofMillis(retryWaitMs);
}

List<SinkRecord> sinkRecords = new ArrayList<>(rows.keySet());

// Check if the table specified exists
// This error shouldn't be thrown. All tables should be created by the connector at startup
Table table = executeWithRetry(() -> bigQuery.getTable(tableId), timeout);
Expand All @@ -139,7 +145,7 @@ public void writeRows(SortedMap<SinkRecord, RowToInsert> rows,
logger.info("Table {} was not found. Creating the table automatically.", tableId);
Boolean created =
executeWithRetry(
() -> schemaManager.createTable(tableId, new ArrayList<>(rows.keySet())), timeout);
() -> schemaManager.createTable(tableId, sinkRecords), timeout);
if (created == null || !created) {
throw new BigQueryConnectException("Failed to create table " + tableId);
}
Expand All @@ -151,6 +157,22 @@ public void writeRows(SortedMap<SinkRecord, RowToInsert> rows,
throw new BigQueryConnectException("Failed to lookup table " + tableId);
}

if (attemptSchemaUpdate && schemaManager != null && !sinkRecords.isEmpty()) {
Boolean schemaUpdated =
executeWithRetry(
() -> {
schemaManager.updateSchema(tableId, sinkRecords);
return Boolean.TRUE;
},
timeout
);
if (schemaUpdated == null) {
throw new ConnectException(
String.format("Failed to update schema for table %s within %d re-attempts.", tableId, retries)
);
}
}

// --- Upload rows to GCS with executeWithRetry (fresh budget for uploads) ---
Duration uploadTimeout = Duration.ofMillis(Math.max(0L, retryWaitMs * Math.max(1, retries)));
if (retries == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package com.wepay.kafka.connect.bigquery.integration;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file needs the standard header for the build to complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it 👍


import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
import com.wepay.kafka.connect.bigquery.integration.utils.BucketClearer;
import com.wepay.kafka.connect.bigquery.integration.utils.SchemaRegistryTestUtils;
import com.wepay.kafka.connect.bigquery.integration.utils.TableClearer;
import com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever;
import io.confluent.connect.avro.AvroConverter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.storage.Converter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("integration")
public class GcsBatchSchemaEvolutionIT extends BaseConnectorIT {

private static final String CONNECTOR_NAME = "gcs-schema-evolution-connector";
private static final int TASKS_MAX = 1;
private static final Duration LOAD_TIMEOUT = Duration.ofMinutes(2);

private BigQuery bigQuery;
private SchemaRegistryTestUtils schemaRegistry;
private String schemaRegistryUrl;
private Converter keyConverter;
private Converter valueConverter;
private Schema keySchema;
private Schema valueSchemaV1;
private Schema valueSchemaV2;
private String topic;
private String table;
private String bucketName;

@BeforeEach
public void setup() throws Exception {
startConnect();
bigQuery = newBigQuery();

schemaRegistry = new SchemaRegistryTestUtils(connect.kafka().bootstrapServers());
schemaRegistry.start();
schemaRegistryUrl = schemaRegistry.schemaRegistryUrl();

initialiseSchemas();
initialiseConverters();

topic = suffixedTableOrTopic("gcs_schema_evolution");
table = suffixedAndSanitizedTable("gcs_schema_evolution");
bucketName = gcsBucket() + "-" + System.nanoTime();

connect.kafka().createTopic(topic);
TableClearer.clearTables(bigQuery, dataset(), table);
createInitialTable();
}

@AfterEach
public void tearDown() throws Exception {
try {
if (connect != null) {
connect.deleteConnector(CONNECTOR_NAME);
}
} finally {
if (schemaRegistry != null) {
schemaRegistry.stop();
}
if (bigQuery != null) {
TableClearer.clearTables(bigQuery, dataset(), table);
}
BucketClearer.clearBucket(keyFile(), project(), bucketName, gcsFolder(), keySource());
stopConnect();
}
}

@Test
public void testSchemaEvolutionAcrossBatchLoads() throws Exception {
connect.configureConnector(CONNECTOR_NAME, connectorProps());
waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);

schemaRegistry.produceRecordsWithKey(
keyConverter,
valueConverter,
Collections.singletonList(recordV1(1L, "snacks")),
topic
);

waitForCommittedRecords(CONNECTOR_NAME, topic, 1, TASKS_MAX);
waitForRowCount(1L);

schemaRegistry.produceRecordsWithKey(
keyConverter,
valueConverter,
Collections.singletonList(recordV2(2L, null, "john")),
topic
);

waitForCommittedRecords(CONNECTOR_NAME, topic, 2, TASKS_MAX);
waitForRowCount(2L);

Table destinationTable = bigQuery.getTable(dataset(), table);
com.google.cloud.bigquery.Schema destinationSchema = destinationTable.getDefinition().getSchema();
Field categoryField = destinationSchema.getFields().get("category");
assertNotNull(categoryField, "category field should exist after load");
assertEquals(Field.Mode.NULLABLE, categoryField.getMode());
Field usernameField = destinationSchema.getFields().get("username");
assertNotNull(usernameField, "username field should be created");
assertEquals(Field.Mode.NULLABLE, usernameField.getMode());

List<List<Object>> rows = readAllRows(bigQuery, table, "id");
assertEquals(Arrays.asList(1L, "snacks", null), rows.get(0));
assertEquals(Arrays.asList(2L, null, "john"), rows.get(1));
}

private Map<String, String> connectorProps() {
Map<String, String> props = baseConnectorProps(TASKS_MAX);
props.put(TOPICS_CONFIG, topic);
props.put(
KEY_CONVERTER_CLASS_CONFIG,
AvroConverter.class.getName()
);
props.put(
KEY_CONVERTER_CLASS_CONFIG + "." + SCHEMA_REGISTRY_URL_CONFIG,
schemaRegistryUrl
);
props.put(
VALUE_CONVERTER_CLASS_CONFIG,
AvroConverter.class.getName()
);
props.put(
VALUE_CONVERTER_CLASS_CONFIG + "." + SCHEMA_REGISTRY_URL_CONFIG,
schemaRegistryUrl
);
props.put(BigQuerySinkConfig.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG, "true");
props.put(BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG, "true");
props.put(BigQuerySinkConfig.ENABLE_BATCH_CONFIG, topic + "," + table);
props.put(BigQuerySinkConfig.BATCH_LOAD_INTERVAL_SEC_CONFIG, "5");
props.put(BigQuerySinkConfig.GCS_BUCKET_NAME_CONFIG, bucketName);
props.put(BigQuerySinkConfig.GCS_FOLDER_NAME_CONFIG, gcsFolder());
props.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, IdentitySchemaRetriever.class.getName());
props.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false");
return props;
}

private void initialiseSchemas() {
keySchema = SchemaBuilder.struct()
.name("com.wepay.kafka.connect.bigquery.integration.Key")
.field("id", Schema.INT64_SCHEMA)
.build();

valueSchemaV1 = SchemaBuilder.struct()
.name("com.wepay.kafka.connect.bigquery.integration.ValueV1")
.field("id", Schema.INT64_SCHEMA)
.field("category", Schema.STRING_SCHEMA)
.build();

valueSchemaV2 = SchemaBuilder.struct()
.name("com.wepay.kafka.connect.bigquery.integration.ValueV2")
.field("id", Schema.INT64_SCHEMA)
.field("category", SchemaBuilder.string().optional().build())
.field("username", SchemaBuilder.string().optional().build())
.build();
}

private void initialiseConverters() {
keyConverter = new AvroConverter();
valueConverter = new AvroConverter();
Map<String, Object> keyConfig = new HashMap<>();
keyConfig.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
keyConverter.configure(keyConfig, true);
Map<String, Object> valueConfig = new HashMap<>();
valueConfig.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
valueConverter.configure(valueConfig, false);
}

private List<SchemaAndValue> recordV1(long id, String category) {
Struct key = new Struct(keySchema)
.put("id", id);
Struct value = new Struct(valueSchemaV1)
.put("id", id)
.put("category", category);
List<SchemaAndValue> record = new ArrayList<>(2);
record.add(new SchemaAndValue(keySchema, key));
record.add(new SchemaAndValue(valueSchemaV1, value));
return record;
}

private List<SchemaAndValue> recordV2(long id, String category, String username) {
Struct key = new Struct(keySchema)
.put("id", id);
Struct value = new Struct(valueSchemaV2)
.put("id", id)
.put("category", category)
.put("username", username);
List<SchemaAndValue> record = new ArrayList<>(2);
record.add(new SchemaAndValue(keySchema, key));
record.add(new SchemaAndValue(valueSchemaV2, value));
return record;
}

private void createInitialTable() {
TableId tableId = TableId.of(dataset(), table);
com.google.cloud.bigquery.Schema schema = com.google.cloud.bigquery.Schema.of(
Field.newBuilder("id", LegacySQLTypeName.INTEGER).setMode(Field.Mode.REQUIRED).build(),
Field.newBuilder("category", LegacySQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build()
);
bigQuery.create(TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build());
}

private void waitForRowCount(long expected) throws InterruptedException {
waitForCondition(() -> {
try {
return countRows(bigQuery, table) >= expected;
} catch (Exception e) {
return false;
}
}, LOAD_TIMEOUT.toMillis(), "Timed out waiting for " + expected + " rows");
}
}
Loading
Loading