Skip to content

Commit 595b284

Browse files
authored
Add feature to update table schemas for GCStoBQ load jobs (#134)
* Add feature to update table schemas for GCStoBQ load jobs
1 parent da127d9 commit 595b284

File tree

4 files changed

+333
-9
lines changed

4 files changed

+333
-9
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public class BigQuerySinkTask extends SinkTask {
142142
private boolean allowNewBigQueryFields;
143143
private boolean useCredentialsProjectId;
144144
private boolean allowRequiredFieldRelaxation;
145+
private boolean allowSchemaUnionization;
145146

146147
/**
147148
* Create a new BigquerySinkTask.
@@ -469,7 +470,6 @@ private SchemaManager newSchemaManager() {
469470
Optional<Long> partitionExpiration = config.getPartitionExpirationMs();
470471
Optional<List<String>> clusteringFieldName = config.getClusteringPartitionFieldNames();
471472
Optional<TimePartitioning.Type> timePartitioningType = config.getTimePartitioningType();
472-
boolean allowSchemaUnionization = config.getBoolean(BigQuerySinkConfig.ALLOW_SCHEMA_UNIONIZATION_CONFIG);
473473
boolean sanitizeFieldNames = config.getBoolean(BigQuerySinkConfig.SANITIZE_FIELD_NAME_CONFIG);
474474
return new SchemaManager(schemaRetriever, schemaConverter, getBigQuery(),
475475
allowNewBigQueryFields, allowRequiredFieldRelaxation, allowSchemaUnionization,
@@ -513,15 +513,20 @@ private Storage getGcs() {
513513

514514
private GcsToBqWriter getGcsWriter() {
515515
BigQuery bigQuery = getBigQuery();
516-
// schemaManager shall only be needed for creating table hence do not fetch instance if not
517-
// needed.
518-
SchemaManager schemaManager = autoCreateTables ? getSchemaManager() : null;
516+
boolean attemptSchemaUpdate = allowNewBigQueryFields
517+
|| allowRequiredFieldRelaxation
518+
|| allowSchemaUnionization;
519+
// schemaManager shall only be needed for creating table or performing schema updates hence do
520+
// not fetch instance if not needed.
521+
boolean needsSchemaManager = autoCreateTables || attemptSchemaUpdate;
522+
SchemaManager schemaManager = needsSchemaManager ? getSchemaManager() : null;
519523
return new GcsToBqWriter(getGcs(),
520524
bigQuery,
521525
schemaManager,
522526
retry,
523527
retryWait,
524528
autoCreateTables,
529+
attemptSchemaUpdate,
525530
time);
526531
}
527532

@@ -559,6 +564,7 @@ public void start(Map<String, String> properties) {
559564
retryWait = config.getLong(BigQuerySinkConfig.BIGQUERY_RETRY_WAIT_CONFIG);
560565
allowNewBigQueryFields = config.getBoolean(BigQuerySinkConfig.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG);
561566
allowRequiredFieldRelaxation = config.getBoolean(BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG);
567+
allowSchemaUnionization = config.getBoolean(BigQuerySinkConfig.ALLOW_SCHEMA_UNIONIZATION_CONFIG);
562568
topicToPartitionTableId = new HashMap<>();
563569
bigQuery = new AtomicReference<>();
564570
schemaManager = new AtomicReference<>();

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriter.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.ArrayList;
4242
import java.util.Collection;
4343
import java.util.Collections;
44+
import java.util.List;
4445
import java.util.Map;
4546
import java.util.Random;
4647
import java.util.SortedMap;
@@ -63,6 +64,7 @@ public class GcsToBqWriter {
6364
private final Storage storage;
6465
private final BigQuery bigQuery;
6566
private final SchemaManager schemaManager;
67+
private final boolean attemptSchemaUpdate;
6668
private final Time time;
6769
private int retries;
6870
private long retryWaitMs;
@@ -87,10 +89,12 @@ public GcsToBqWriter(Storage storage,
8789
int retries,
8890
long retryWaitMs,
8991
boolean autoCreateTables,
92+
boolean attemptSchemaUpdate,
9093
Time time) {
9194
this.storage = storage;
9295
this.bigQuery = bigQuery;
9396
this.schemaManager = schemaManager;
97+
this.attemptSchemaUpdate = attemptSchemaUpdate;
9498
this.time = time;
9599

96100
this.retries = retries;
@@ -130,6 +134,8 @@ public void writeRows(SortedMap<SinkRecord, RowToInsert> rows,
130134
timeout = Duration.ofMillis(retryWaitMs);
131135
}
132136

137+
List<SinkRecord> sinkRecords = new ArrayList<>(rows.keySet());
138+
133139
// Check if the table specified exists
134140
// This error shouldn't be thrown. All tables should be created by the connector at startup
135141
Table table = executeWithRetry(() -> bigQuery.getTable(tableId), timeout);
@@ -139,7 +145,7 @@ public void writeRows(SortedMap<SinkRecord, RowToInsert> rows,
139145
logger.info("Table {} was not found. Creating the table automatically.", tableId);
140146
Boolean created =
141147
executeWithRetry(
142-
() -> schemaManager.createTable(tableId, new ArrayList<>(rows.keySet())), timeout);
148+
() -> schemaManager.createTable(tableId, sinkRecords), timeout);
143149
if (created == null || !created) {
144150
throw new BigQueryConnectException("Failed to create table " + tableId);
145151
}
@@ -151,6 +157,22 @@ public void writeRows(SortedMap<SinkRecord, RowToInsert> rows,
151157
throw new BigQueryConnectException("Failed to lookup table " + tableId);
152158
}
153159

160+
if (attemptSchemaUpdate && schemaManager != null && !sinkRecords.isEmpty()) {
161+
Boolean schemaUpdated =
162+
executeWithRetry(
163+
() -> {
164+
schemaManager.updateSchema(tableId, sinkRecords);
165+
return Boolean.TRUE;
166+
},
167+
timeout
168+
);
169+
if (schemaUpdated == null) {
170+
throw new ConnectException(
171+
String.format("Failed to update schema for table %s within %d re-attempts.", tableId, retries)
172+
);
173+
}
174+
}
175+
154176
// --- Upload rows to GCS with executeWithRetry (fresh budget for uploads) ---
155177
Duration uploadTimeout = Duration.ofMillis(Math.max(0L, retryWaitMs * Math.max(1, retries)));
156178
if (retries == 0) {
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
/*
2+
* Copyright 2024 Copyright 2022 Aiven Oy and
3+
* bigquery-connector-for-apache-kafka project contributors
4+
*
5+
* This software contains code derived from the Confluent BigQuery
6+
* Kafka Connector, Copyright Confluent, Inc, which in turn
7+
* contains code derived from the WePay BigQuery Kafka Connector,
8+
* Copyright WePay, Inc.
9+
*
10+
* Licensed under the Apache License, Version 2.0 (the "License");
11+
* you may not use this file except in compliance with the License.
12+
* You may obtain a copy of the License at
13+
*
14+
* http://www.apache.org/licenses/LICENSE-2.0
15+
*
16+
* Unless required by applicable law or agreed to in writing,
17+
* software distributed under the License is distributed on an
18+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
19+
* KIND, either express or implied. See the License for the
20+
* specific language governing permissions and limitations
21+
* under the License.
22+
*/
23+
24+
package com.wepay.kafka.connect.bigquery.integration;
25+
26+
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
27+
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
28+
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
29+
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
30+
import static org.apache.kafka.test.TestUtils.waitForCondition;
31+
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
import static org.junit.jupiter.api.Assertions.assertNotNull;
33+
34+
import com.google.cloud.bigquery.BigQuery;
35+
import com.google.cloud.bigquery.Field;
36+
import com.google.cloud.bigquery.LegacySQLTypeName;
37+
import com.google.cloud.bigquery.StandardTableDefinition;
38+
import com.google.cloud.bigquery.Table;
39+
import com.google.cloud.bigquery.TableId;
40+
import com.google.cloud.bigquery.TableInfo;
41+
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
42+
import com.wepay.kafka.connect.bigquery.integration.utils.BucketClearer;
43+
import com.wepay.kafka.connect.bigquery.integration.utils.SchemaRegistryTestUtils;
44+
import com.wepay.kafka.connect.bigquery.integration.utils.TableClearer;
45+
import com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever;
46+
import io.confluent.connect.avro.AvroConverter;
47+
import java.time.Duration;
48+
import java.util.ArrayList;
49+
import java.util.Arrays;
50+
import java.util.Collections;
51+
import java.util.HashMap;
52+
import java.util.List;
53+
import java.util.Map;
54+
import org.apache.kafka.connect.data.Schema;
55+
import org.apache.kafka.connect.data.SchemaAndValue;
56+
import org.apache.kafka.connect.data.SchemaBuilder;
57+
import org.apache.kafka.connect.data.Struct;
58+
import org.apache.kafka.connect.storage.Converter;
59+
import org.junit.jupiter.api.AfterEach;
60+
import org.junit.jupiter.api.BeforeEach;
61+
import org.junit.jupiter.api.Tag;
62+
import org.junit.jupiter.api.Test;
63+
64+
@Tag("integration")
65+
public class GcsBatchSchemaEvolutionIT extends BaseConnectorIT {
66+
67+
private static final String CONNECTOR_NAME = "gcs-schema-evolution-connector";
68+
private static final int TASKS_MAX = 1;
69+
private static final Duration LOAD_TIMEOUT = Duration.ofMinutes(2);
70+
71+
private BigQuery bigQuery;
72+
private SchemaRegistryTestUtils schemaRegistry;
73+
private String schemaRegistryUrl;
74+
private Converter keyConverter;
75+
private Converter valueConverter;
76+
private Schema keySchema;
77+
private Schema valueSchemaV1;
78+
private Schema valueSchemaV2;
79+
private String topic;
80+
private String table;
81+
private String bucketName;
82+
83+
@BeforeEach
84+
public void setup() throws Exception {
85+
startConnect();
86+
bigQuery = newBigQuery();
87+
88+
schemaRegistry = new SchemaRegistryTestUtils(connect.kafka().bootstrapServers());
89+
schemaRegistry.start();
90+
schemaRegistryUrl = schemaRegistry.schemaRegistryUrl();
91+
92+
initialiseSchemas();
93+
initialiseConverters();
94+
95+
topic = suffixedTableOrTopic("gcs_schema_evolution");
96+
table = suffixedAndSanitizedTable("gcs_schema_evolution");
97+
bucketName = gcsBucket() + "-" + System.nanoTime();
98+
99+
connect.kafka().createTopic(topic);
100+
TableClearer.clearTables(bigQuery, dataset(), table);
101+
createInitialTable();
102+
}
103+
104+
@AfterEach
105+
public void tearDown() throws Exception {
106+
try {
107+
if (connect != null) {
108+
connect.deleteConnector(CONNECTOR_NAME);
109+
}
110+
} finally {
111+
if (schemaRegistry != null) {
112+
schemaRegistry.stop();
113+
}
114+
if (bigQuery != null) {
115+
TableClearer.clearTables(bigQuery, dataset(), table);
116+
}
117+
BucketClearer.clearBucket(keyFile(), project(), bucketName, gcsFolder(), keySource());
118+
stopConnect();
119+
}
120+
}
121+
122+
@Test
123+
public void testSchemaEvolutionAcrossBatchLoads() throws Exception {
124+
connect.configureConnector(CONNECTOR_NAME, connectorProps());
125+
waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
126+
127+
schemaRegistry.produceRecordsWithKey(
128+
keyConverter,
129+
valueConverter,
130+
Collections.singletonList(recordV1(1L, "snacks")),
131+
topic
132+
);
133+
134+
waitForCommittedRecords(CONNECTOR_NAME, topic, 1, TASKS_MAX);
135+
waitForRowCount(1L);
136+
137+
schemaRegistry.produceRecordsWithKey(
138+
keyConverter,
139+
valueConverter,
140+
Collections.singletonList(recordV2(2L, null, "john")),
141+
topic
142+
);
143+
144+
waitForCommittedRecords(CONNECTOR_NAME, topic, 2, TASKS_MAX);
145+
waitForRowCount(2L);
146+
147+
Table destinationTable = bigQuery.getTable(dataset(), table);
148+
com.google.cloud.bigquery.Schema destinationSchema = destinationTable.getDefinition().getSchema();
149+
Field categoryField = destinationSchema.getFields().get("category");
150+
assertNotNull(categoryField, "category field should exist after load");
151+
assertEquals(Field.Mode.NULLABLE, categoryField.getMode());
152+
Field usernameField = destinationSchema.getFields().get("username");
153+
assertNotNull(usernameField, "username field should be created");
154+
assertEquals(Field.Mode.NULLABLE, usernameField.getMode());
155+
156+
List<List<Object>> rows = readAllRows(bigQuery, table, "id");
157+
assertEquals(Arrays.asList(1L, "snacks", null), rows.get(0));
158+
assertEquals(Arrays.asList(2L, null, "john"), rows.get(1));
159+
}
160+
161+
private Map<String, String> connectorProps() {
162+
Map<String, String> props = baseConnectorProps(TASKS_MAX);
163+
props.put(TOPICS_CONFIG, topic);
164+
props.put(
165+
KEY_CONVERTER_CLASS_CONFIG,
166+
AvroConverter.class.getName()
167+
);
168+
props.put(
169+
KEY_CONVERTER_CLASS_CONFIG + "." + SCHEMA_REGISTRY_URL_CONFIG,
170+
schemaRegistryUrl
171+
);
172+
props.put(
173+
VALUE_CONVERTER_CLASS_CONFIG,
174+
AvroConverter.class.getName()
175+
);
176+
props.put(
177+
VALUE_CONVERTER_CLASS_CONFIG + "." + SCHEMA_REGISTRY_URL_CONFIG,
178+
schemaRegistryUrl
179+
);
180+
props.put(BigQuerySinkConfig.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG, "true");
181+
props.put(BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG, "true");
182+
props.put(BigQuerySinkConfig.ENABLE_BATCH_CONFIG, topic + "," + table);
183+
props.put(BigQuerySinkConfig.BATCH_LOAD_INTERVAL_SEC_CONFIG, "5");
184+
props.put(BigQuerySinkConfig.GCS_BUCKET_NAME_CONFIG, bucketName);
185+
props.put(BigQuerySinkConfig.GCS_FOLDER_NAME_CONFIG, gcsFolder());
186+
props.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, IdentitySchemaRetriever.class.getName());
187+
props.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false");
188+
return props;
189+
}
190+
191+
private void initialiseSchemas() {
192+
keySchema = SchemaBuilder.struct()
193+
.name("com.wepay.kafka.connect.bigquery.integration.Key")
194+
.field("id", Schema.INT64_SCHEMA)
195+
.build();
196+
197+
valueSchemaV1 = SchemaBuilder.struct()
198+
.name("com.wepay.kafka.connect.bigquery.integration.ValueV1")
199+
.field("id", Schema.INT64_SCHEMA)
200+
.field("category", Schema.STRING_SCHEMA)
201+
.build();
202+
203+
valueSchemaV2 = SchemaBuilder.struct()
204+
.name("com.wepay.kafka.connect.bigquery.integration.ValueV2")
205+
.field("id", Schema.INT64_SCHEMA)
206+
.field("category", SchemaBuilder.string().optional().build())
207+
.field("username", SchemaBuilder.string().optional().build())
208+
.build();
209+
}
210+
211+
private void initialiseConverters() {
212+
keyConverter = new AvroConverter();
213+
valueConverter = new AvroConverter();
214+
Map<String, Object> keyConfig = new HashMap<>();
215+
keyConfig.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
216+
keyConverter.configure(keyConfig, true);
217+
Map<String, Object> valueConfig = new HashMap<>();
218+
valueConfig.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
219+
valueConverter.configure(valueConfig, false);
220+
}
221+
222+
private List<SchemaAndValue> recordV1(long id, String category) {
223+
Struct key = new Struct(keySchema)
224+
.put("id", id);
225+
Struct value = new Struct(valueSchemaV1)
226+
.put("id", id)
227+
.put("category", category);
228+
List<SchemaAndValue> record = new ArrayList<>(2);
229+
record.add(new SchemaAndValue(keySchema, key));
230+
record.add(new SchemaAndValue(valueSchemaV1, value));
231+
return record;
232+
}
233+
234+
private List<SchemaAndValue> recordV2(long id, String category, String username) {
235+
Struct key = new Struct(keySchema)
236+
.put("id", id);
237+
Struct value = new Struct(valueSchemaV2)
238+
.put("id", id)
239+
.put("category", category)
240+
.put("username", username);
241+
List<SchemaAndValue> record = new ArrayList<>(2);
242+
record.add(new SchemaAndValue(keySchema, key));
243+
record.add(new SchemaAndValue(valueSchemaV2, value));
244+
return record;
245+
}
246+
247+
private void createInitialTable() {
248+
TableId tableId = TableId.of(dataset(), table);
249+
com.google.cloud.bigquery.Schema schema = com.google.cloud.bigquery.Schema.of(
250+
Field.newBuilder("id", LegacySQLTypeName.INTEGER).setMode(Field.Mode.REQUIRED).build(),
251+
Field.newBuilder("category", LegacySQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build()
252+
);
253+
bigQuery.create(TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build());
254+
}
255+
256+
private void waitForRowCount(long expected) throws InterruptedException {
257+
waitForCondition(() -> {
258+
try {
259+
return countRows(bigQuery, table) >= expected;
260+
} catch (Exception e) {
261+
return false;
262+
}
263+
}, LOAD_TIMEOUT.toMillis(), "Timed out waiting for " + expected + " rows");
264+
}
265+
}

0 commit comments

Comments
 (0)