Skip to content

Conversation

veliuenal
Copy link
Contributor

@veliuenal veliuenal commented Sep 18, 2025

Overview

Closes #96

Github Issue : #96

Schema evolution was not applied on the GCS batch path. Even with "allowNewBigQueryFields" and/or "allowSchemaUnionization" enabled, new columns were not added and required fields were not relaxed, causing LOAD jobs to fail with “No such field” errors and rejected rows.

This change enables the GCS → BigQuery batch path to update destination table schemas before issuing LOAD jobs.
When schema evolution is enabled, the batch writer derives a BigQuery schema from the batch’s records and calls SchemaManager.updateSchema(...) within the existing retry/timeout budget. If all schema-evolution flags(allowNewBigQueryFields,allowBigQueryRequiredFieldRelaxation,allowSchemaUnionization) are disabled, the writer bypasses this step, preserving current behavior.

Implementation Details

  • BigQuerySinkTask

    • Capture and persist all three schema-evolution flags at start-up:

      • allowNewBigQueryFields
      • allowBigQueryRequiredFieldRelaxation
      • allowSchemaUnionization
    • Supply these flags to the GCS writer so it uses SchemaManager only when pre-patching is required (at least one flag enabled).

  • GcsToBqWriter

    • After ensuring the table exists , convert the batch’s schema to a BigQuery schema and invoke:

      schemaManager.updateSchema(tableId, desiredSchema, allowAdd, allowRelax, allowUnion)

      inside the existing retry/timeout wrapper, before uploading the batch payload to GCS.

    • If retries are exhausted, raise a clear error.

    • No-op when all schema-evolution flags are disabled = strict backward compatibility.

Testing

The result of the integration test:

GcsBatchSchemaEvolutionIT sends two batches through the GCS path. The first uses the original schema; the second relaxes a previously required field and adds a new nullable column. The table schema is updated before the second load, so both fields are NULLABLE and the new column appears as expected.
The load completes as expected and the data aligns with the evolved schema. This behavior confirms that the batch path now respects schema-relaxation and field-addition settings.

For local testing, I added a short pause (about 10 seconds) between the update and the second load to watch the schema change show up in Bigquery before proceeding. You can see the schema of the table before and after the second record inserted into the table.

image image

Unit Tests

[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.293 s -- in com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiWriterTest
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.037 s -- in com.wepay.kafka.connect.bigquery.write.storage.BigQueryBuilderTest
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.007 s -- in com.wepay.kafka.connect.bigquery.write.storage.BigQueryWriteSettingsBuilderTest
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.030 s -- in com.wepay.kafka.connect.bigquery.write.storage.GcsBuilderTest
[INFO] Tests run: 17, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.127 s -- in com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiBatchApplicationStreamTest
[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.004 s -- in com.wepay.kafka.connect.bigquery.write.storage.StorageApiBatchModeHandlerTest
[INFO] Tests run: 15, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.047 s -- in com.wepay.kafka.connect.bigquery.write.storage.StorageWriteApiDefaultStreamTest
[INFO] Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.131 s -- in com.wepay.kafka.connect.bigquery.write.row.BigQueryWriterTest
[INFO] Tests run: 7, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.019 s -- in com.wepay.kafka.connect.bigquery.write.row.GcsToBqWriterTest
[INFO] Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.158 s -- in com.wepay.kafka.connect.bigquery.GcpClientBuilderProjectTest
[INFO] Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.024 s -- in com.wepay.kafka.connect.bigquery.config.GcsBucketValidatorTest
[INFO] Tests run: 33, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.025 s -- in com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfigTest
[INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.002 s -- in com.wepay.kafka.connect.bigquery.config.PartitioningModeValidatorTest
[INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.002 s -- in com.wepay.kafka.connect.bigquery.config.MultiPropertyValidatorTest
[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.006 s -- in com.wepay.kafka.connect.bigquery.config.CredentialsValidatorTest
[INFO] Tests run: 11, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.006 s -- in com.wepay.kafka.connect.bigquery.config.StorageWriteApiValidatorTest
[INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.002 s -- in com.wepay.kafka.connect.bigquery.config.PartitioningTypeValidatorTest
[INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.001 s -- in com.wepay.kafka.connect.bigquery.utils.PartitionedTableIdTest
[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.002 s -- in com.wepay.kafka.connect.bigquery.utils.FieldNameSanitizerTest
[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0 s -- in com.wepay.kafka.connect.bigquery.ErrantRecordHandlerTest
[INFO] Tests run: 23, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.174 s -- in com.wepay.kafka.connect.bigquery.BigQuerySinkTaskTest
[INFO] Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.127 s -- in com.wepay.kafka.connect.bigquery.BigQueryStorageApiBatchSinkTaskTest
[INFO] Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.002 s -- in com.wepay.kafka.connect.bigquery.BigQuerySinkConnectorTest
[INFO] Tests run: 12, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.036 s -- in com.wepay.kafka.connect.bigquery.MergeQueriesTest
[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.112 s -- in com.wepay.kafka.connect.bigquery.BigQueryStorageApiSinkTaskTest
[INFO] Tests run: 17, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.007 s -- in com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiErrorResponsesTest
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.001 s -- in com.wepay.kafka.connect.bigquery.exception.BigQueryErrorResponsesTest
[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.001 s -- in com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiConnectExceptionTest
[INFO] Tests run: 39, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.058 s -- in com.wepay.kafka.connect.bigquery.SchemaManagerTest
[INFO] Tests run: 8, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.055 s -- in com.wepay.kafka.connect.bigquery.GcsToBqLoadRunnableTest
[INFO] Tests run: 12, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.005 s -- in com.wepay.kafka.connect.bigquery.convert.KafkaDataConverterTest
[INFO] Tests run: 42, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.086 s -- in com.wepay.kafka.connect.bigquery.convert.BigQuerySchemaConverterTest
[INFO] Tests run: 37, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.025 s -- in com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverterTest
[INFO] Tests run: 7, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.002 s -- in com.wepay.kafka.connect.bigquery.convert.logicaltype.KafkaLogicalConvertersTest
[INFO] Tests run: 12, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.005 s -- in com.wepay.kafka.connect.bigquery.convert.logicaltype.DebeziumLogicalConvertersTest
[INFO] Results:
[INFO] 
[INFO] Tests run: 358, Failures: 0, Errors: 0, Skipped: 0
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  6.837 s
[INFO] Finished at: 2025-09-18T14:04:37+02:00
[INFO] ------------------------------------------------------------------------

@veliuenal
Copy link
Contributor Author

Hello @Claudenw
This PR aims to resolve the issue mentioned here : #96
Could you please review it?
Thanks in advance.

}

@Test
public void schemaUpdateSkippedWhenEnabled() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name makes no sense to me. shcemaUpdateSkipped, but the test verifies that it was not skipped, right?

Is there a better name for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, corrected 👍

GcsToBqWriter writer =
new GcsToBqWriter(
storage, bigQuery, schemaManager, retries, retryWaitMs, autoCreate, mockTime);
storage, bigQuery, schemaManager, retries, retryWaitMs, autoCreate,false, mockTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be spaces around the "false" in the constructor.

@veliuenal veliuenal requested a review from Claudenw September 18, 2025 15:11
@@ -0,0 +1,242 @@
package com.wepay.kafka.connect.bigquery.integration;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file needs the standard header for the build to complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it 👍

Copy link
Contributor

@Claudenw Claudenw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look at #88 and let me know if that change is going to impact your changes. If so how can we make both work?

@veliuenal
Copy link
Contributor Author

@Claudenw thanks for the heads up.
#88 doesn't impact our work.
They’re adding an option to control how unknown fields in Kafka records are handled when writing to BigQuery.
However, their feature is only being added to the BigQueryWriter, not the GcsToBqWriter class, which manages the GCS-to-BQ tasks. From what I can see, none of the functions or classes related to the GCS-to-BQ job are touched.

@veliuenal veliuenal requested a review from Claudenw September 19, 2025 13:42
@Claudenw Claudenw merged commit 595b284 into Aiven-Open:main Sep 19, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement schema evolution for the the GCS-to-BQ load path
2 participants