Skip to content

Commit 7770274

Browse files
authored
Merge branch '2.6.1-RC' into dependabot/maven/org.apache.kafka-kafka-clients-5.3.0-ccs
2 parents f464a07 + c8b216b commit 7770274

File tree

4 files changed

+180
-39
lines changed

4 files changed

+180
-39
lines changed

.github/workflows/manual.yml

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
#
2+
# Copyright 2024 Copyright 2022 Aiven Oy and
3+
# bigquery-connector-for-apache-kafka project contributors
4+
#
5+
# This software contains code derived from the Confluent BigQuery
6+
# Kafka Connector, Copyright Confluent, Inc, which in turn
7+
# contains code derived from the WePay BigQuery Kafka Connector,
8+
# Copyright WePay, Inc.
9+
#
10+
# Licensed under the Apache License, Version 2.0 (the "License");
11+
# you may not use this file except in compliance with the License.
12+
# You may obtain a copy of the License at
13+
#
14+
# http://www.apache.org/licenses/LICENSE-2.0
15+
#
16+
# Unless required by applicable law or agreed to in writing,
17+
# software distributed under the License is distributed on an
18+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
19+
# KIND, either express or implied. See the License for the
20+
# specific language governing permissions and limitations
21+
# under the License.
22+
#
23+
24+
# Workflow to check pull requests and new commits to main branches
25+
# This checks the source in the state as if after the merge.
26+
name: Manual workflow
27+
on:
28+
workflow_dispatch:
29+
30+
permissions:
31+
contents: write
32+
pull-requests: write
33+
issues: write
34+
35+
36+
# Disallow concurrent runs for the same PR by cancelling in-progress runs
37+
# when new commits are pushed
38+
concurrency:
39+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
40+
cancel-in-progress: true
41+
42+
jobs:
43+
build:
44+
name: Build
45+
runs-on: ubuntu-latest
46+
steps:
47+
- name: Checkout code
48+
uses: actions/checkout@v2
49+
- name: Dump GitHub context
50+
env:
51+
GITHUB_CONTEXT: ${{ toJson(github) }}
52+
run: echo "$GITHUB_CONTEXT"
53+
- name: Test
54+
env:
55+
TEST_GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
56+
GCP_CREDENTIALS: ${{ secrets.GCP_CREDENTIALS }}
57+
run: |
58+
echo ${#TEST_GITHUB_TOKEN}
59+
echo ${#GCP_CREDENTIALS}
60+
# - name: Set up JDK 17
61+
# uses: actions/setup-java@v4
62+
# with:
63+
# distribution: 'adopt'
64+
# java-version: 17
65+
# cache: maven
66+
# - name: License header check
67+
# run: |
68+
# mvn -ntp license:remove license:format
69+
# if [[ -n $(git status -s) ]]; then
70+
# echo 1>&2 'Some files do not have the correct license header:'
71+
# git diff --name-only 1>&2
72+
# echo 1>&2 'Please update the license headers for these files by running `mvn license:remove license:format`'
73+
# exit 1
74+
# fi
75+
# - name: Build (Maven)
76+
# run: mvn -P ci --batch-mode clean package -DskipTests
77+
# - name: Unit tests (Maven)
78+
# run: mvn -ntp -P ci --batch-mode test
79+
# - name: "upload build failure reports"
80+
# uses: actions/upload-artifact@v4
81+
# if: failure()
82+
# with:
83+
# name: unit-test-results
84+
# path: |
85+
# **/target/*-reports/**
86+
# retention-days: 1
87+
# - name: Integration tests (Maven)
88+
# env:
89+
# # Necessary for client builder integration tests that run with
90+
# # default application credentials
91+
# CREDENTIALS_JSON: ${{ secrets.GCP_CREDENTIALS }}
92+
# GOOGLE_APPLICATION_CREDENTIALS: /tmp/creds.json
93+
# KCBQ_TEST_KEYFILE: ${{ secrets.GCP_CREDENTIALS }}
94+
# KCBQ_TEST_KEY_SOURCE: JSON
95+
# KCBQ_TEST_PROJECT: ${{ secrets.KCBQ_TEST_PROJECT }}
96+
# KCBQ_TEST_DATASET: ${{ secrets.KCBQ_TEST_DATASET }}
97+
# KCBQ_TEST_BUCKET: ${{ secrets.KCBQ_TEST_BUCKET }}
98+
# run: |
99+
# export
100+
# echo "$CREDENTIALS_JSON" > /tmp/creds.json
101+
# export KCBQ_TEST_TABLE_SUFFIX=_$(date +%s)_$RANDOM
102+
# mvn -ntp -P ci -Dskip.unit.tests=true verify
103+
# - name: Upload integration test results (Maven)
104+
# if: always()
105+
# uses: actions/upload-artifact@v4
106+
# with:
107+
# path: |
108+
# **/target/failsafe-reports/*
109+
# name: integration-test-results
110+
# retention-days: 3

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java

Lines changed: 54 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@
3535
import java.lang.reflect.Constructor;
3636
import java.lang.reflect.InvocationTargetException;
3737
import java.util.ArrayList;
38+
import java.util.Arrays;
3839
import java.util.Collections;
3940
import java.util.HashMap;
41+
import java.util.HashSet;
4042
import java.util.List;
4143
import java.util.Map;
4244
import java.util.Optional;
45+
import java.util.Set;
4346
import java.util.function.Function;
4447
import java.util.stream.Collectors;
4548
import java.util.stream.Stream;
@@ -51,11 +54,22 @@
5154
import org.apache.kafka.common.config.types.Password;
5255
import org.apache.kafka.connect.errors.ConnectException;
5356
import org.apache.kafka.connect.sink.SinkConnector;
57+
import org.slf4j.Logger;
58+
import org.slf4j.LoggerFactory;
5459

5560
/**
5661
* Base class for connector and task configs; contains properties shared between the two of them.
5762
*/
5863
public class BigQuerySinkConfig extends AbstractConfig {
64+
65+
private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkConfig.class);
66+
67+
public static final String DEPRECATED_DOC = "(DEPRECATED)";
68+
public static final String GCS_LOAD_DEPRECATION_NOTICE =
69+
"GCS batch loading has been deprecated and will be removed in a future major release.";
70+
public static final String DECORATOR_SYNTAX_DEPRECATION_NOTICE =
71+
"Use of partition decorator syntax has been deprecated and will be removed in a future release.";
72+
5973
// Values taken from https://github.com/apache/kafka/blob/1.1.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java#L33
6074
public static final String TOPICS_CONFIG = SinkConnector.TOPICS_CONFIG;
6175
public static final String TOPICS_DEFAULT = "";
@@ -174,7 +188,7 @@ public class BigQuerySinkConfig extends AbstractConfig {
174188
private static final List<String> ENABLE_BATCH_DEFAULT = Collections.emptyList();
175189
private static final ConfigDef.Importance ENABLE_BATCH_IMPORTANCE = ConfigDef.Importance.LOW;
176190
private static final String ENABLE_BATCH_DOC =
177-
"Beta Feature; use with caution: The sublist of topics to be batch loaded through GCS";
191+
"The sublist of topics to be batch loaded through GCS.";
178192
private static final ConfigDef.Type BATCH_LOAD_INTERVAL_SEC_TYPE = ConfigDef.Type.INT;
179193
private static final Integer BATCH_LOAD_INTERVAL_SEC_DEFAULT = 120;
180194
private static final ConfigDef.Importance BATCH_LOAD_INTERVAL_SEC_IMPORTANCE =
@@ -541,12 +555,31 @@ public class BigQuerySinkConfig extends AbstractConfig {
541555

542556
protected BigQuerySinkConfig(ConfigDef config, Map<String, String> properties) {
543557
super(config, properties);
558+
logDeprecationWarnings();
544559
}
545560

546561
public BigQuerySinkConfig(Map<String, String> properties) {
547562
this(getConfig(), properties);
548563
}
549564

565+
private void logDeprecationWarnings() {
566+
if (!getList(ENABLE_BATCH_CONFIG).isEmpty()) {
567+
logger.warn(
568+
GCS_LOAD_DEPRECATION_NOTICE
569+
+ " To disable this feature, remove the {} property from the connector configuration",
570+
ENABLE_BATCH_CONFIG
571+
);
572+
}
573+
574+
if (getBoolean(BIGQUERY_PARTITION_DECORATOR_CONFIG)) {
575+
logger.warn(
576+
DECORATOR_SYNTAX_DEPRECATION_NOTICE
577+
+ " To disable this feature, set the {} property to false in the connector configuration",
578+
BIGQUERY_PARTITION_DECORATOR_CONFIG
579+
);
580+
}
581+
}
582+
550583
/**
551584
* Return the ConfigDef object used to define this config's fields.
552585
*
@@ -579,25 +612,25 @@ public static ConfigDef getConfig() {
579612
ENABLE_BATCH_TYPE,
580613
ENABLE_BATCH_DEFAULT,
581614
ENABLE_BATCH_IMPORTANCE,
582-
ENABLE_BATCH_DOC
615+
deprecatedGcsLoadDoc(ENABLE_BATCH_DOC)
583616
).define(
584617
BATCH_LOAD_INTERVAL_SEC_CONFIG,
585618
BATCH_LOAD_INTERVAL_SEC_TYPE,
586619
BATCH_LOAD_INTERVAL_SEC_DEFAULT,
587620
BATCH_LOAD_INTERVAL_SEC_IMPORTANCE,
588-
BATCH_LOAD_INTERVAL_SEC_DOC
621+
deprecatedGcsLoadDoc(BATCH_LOAD_INTERVAL_SEC_DOC)
589622
).define(
590623
GCS_BUCKET_NAME_CONFIG,
591624
GCS_BUCKET_NAME_TYPE,
592625
GCS_BUCKET_NAME_DEFAULT,
593626
GCS_BUCKET_NAME_IMPORTANCE,
594-
GCS_BUCKET_NAME_DOC
627+
deprecatedGcsLoadDoc(GCS_BUCKET_NAME_DOC)
595628
).define(
596629
GCS_FOLDER_NAME_CONFIG,
597630
GCS_FOLDER_NAME_TYPE,
598631
GCS_FOLDER_NAME_DEFAULT,
599632
GCS_FOLDER_NAME_IMPORTANCE,
600-
GCS_FOLDER_NAME_DOC
633+
deprecatedGcsLoadDoc(GCS_FOLDER_NAME_DOC)
601634
).define(
602635
PROJECT_CONFIG,
603636
PROJECT_TYPE,
@@ -691,7 +724,7 @@ public static ConfigDef getConfig() {
691724
AUTO_CREATE_BUCKET_TYPE,
692725
AUTO_CREATE_BUCKET_DEFAULT,
693726
AUTO_CREATE_BUCKET_IMPORTANCE,
694-
AUTO_CREATE_BUCKET_DOC
727+
deprecatedGcsLoadDoc(AUTO_CREATE_BUCKET_DOC)
695728
).define(
696729
ALLOW_NEW_BIGQUERY_FIELDS_CONFIG,
697730
ALLOW_NEW_BIGQUERY_FIELDS_TYPE,
@@ -776,13 +809,13 @@ public static ConfigDef getConfig() {
776809
BIGQUERY_MESSAGE_TIME_PARTITIONING_CONFIG_TYPE,
777810
BIGQUERY_MESSAGE_TIME_PARTITIONING_DEFAULT,
778811
BIGQUERY_MESSAGE_TIME_PARTITIONING_IMPORTANCE,
779-
BIGQUERY_MESSAGE_TIME_PARTITIONING_DOC
812+
deprecatedPartitionSyntaxDoc(BIGQUERY_MESSAGE_TIME_PARTITIONING_DOC)
780813
).define(
781814
BIGQUERY_PARTITION_DECORATOR_CONFIG,
782815
BIGQUERY_PARTITION_DECORATOR_CONFIG_TYPE,
783816
BIGQUERY_PARTITION_DECORATOR_DEFAULT,
784817
BIGQUERY_PARTITION_DECORATOR_IMPORTANCE,
785-
BIGQUERY_PARTITION_DECORATOR_DOC
818+
deprecatedPartitionSyntaxDoc(BIGQUERY_PARTITION_DECORATOR_DOC)
786819
).define(
787820
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG,
788821
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_TYPE,
@@ -879,18 +912,6 @@ public boolean visible(String s, Map<String, Object> map) {
879912
);
880913
}
881914

882-
public static boolean upsertDeleteEnabled(Map<String, String> props) {
883-
String upsertStr = props.get(UPSERT_ENABLED_CONFIG);
884-
String deleteStr = props.get(DELETE_ENABLED_CONFIG);
885-
return Boolean.TRUE.toString().equalsIgnoreCase(upsertStr)
886-
|| Boolean.TRUE.toString().equalsIgnoreCase(deleteStr);
887-
}
888-
889-
public static boolean gcsBatchLoadingEnabled(Map<String, String> props) {
890-
String batchLoadStr = props.get(ENABLE_BATCH_CONFIG);
891-
return batchLoadStr != null && !batchLoadStr.isEmpty();
892-
}
893-
894915
/**
895916
* Used in conjunction with {@link com.wepay.kafka.connect.bigquery.BigQuerySinkConnector#validate(Map)} to perform
896917
* preflight configuration checks. Simple validations that only require a single property value at a time (such as
@@ -936,13 +957,6 @@ public GcpClientBuilder.KeySource getKeySource() {
936957
}
937958
}
938959

939-
/**
940-
* Returns the keyfile
941-
*/
942-
public String getKeyFile() {
943-
return Optional.ofNullable(getPassword(KEYFILE_CONFIG)).map(Password::value).orElse(null);
944-
}
945-
946960
/**
947961
* Return a new instance of the configured Schema Converter.
948962
*
@@ -1134,4 +1148,17 @@ private static String header(String text) {
11341148
return wrapper + "\n" + text + "\n" + wrapper + "\n";
11351149
}
11361150

1151+
private static String deprecatedGcsLoadDoc(String doc) {
1152+
1153+
return deprecatedDoc(doc, GCS_LOAD_DEPRECATION_NOTICE);
1154+
}
1155+
1156+
public static String deprecatedPartitionSyntaxDoc(String doc) {
1157+
return deprecatedDoc(doc, DECORATOR_SYNTAX_DEPRECATION_NOTICE);
1158+
}
1159+
1160+
private static String deprecatedDoc(String doc, String notice) {
1161+
return DEPRECATED_DOC + " " + doc + " Warning: " + notice;
1162+
}
1163+
11371164
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GcsToBqWriter.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,21 @@ public void writeRows(SortedMap<SinkRecord, RowToInsert> rows,
128128

129129
// Check if the table specified exists
130130
// This error shouldn't be thrown. All tables should be created by the connector at startup
131-
if (autoCreateTables && bigQuery.getTable(tableId) == null) {
132-
attemptTableCreate(tableId, new ArrayList<>(rows.keySet()));
131+
int lookupAttempts = 0;
132+
boolean lookupSuccess = bigQuery.getTable(tableId) != null;
133+
BigQueryException lookupException = null;
134+
135+
if (autoCreateTables && !lookupSuccess) {
136+
logger.info("Table {} was not found. Creating the table automatically.", tableId);
137+
schemaManager.createTable(tableId, new ArrayList<>(rows.keySet()));
138+
while (!lookupSuccess && lookupAttempts <= retries) {
139+
waitRandomTime();
140+
lookupSuccess = bigQuery.getTable(tableId) != null;
141+
lookupAttempts++;
142+
}
143+
}
144+
if (!lookupSuccess) {
145+
throw new BigQueryConnectException("Failed to lookup table " + tableId, lookupException);
133146
}
134147

135148
int attemptCount = 0;
@@ -200,13 +213,4 @@ private void waitRandomTime() throws InterruptedException {
200213
time.sleep(retryWaitMs + random.nextInt(WAIT_MAX_JITTER));
201214
}
202215

203-
private void attemptTableCreate(TableId tableId, List<SinkRecord> records) {
204-
try {
205-
logger.info("Table {} does not exist, auto-creating table ", tableId);
206-
schemaManager.createTable(tableId, records);
207-
} catch (BigQueryException exception) {
208-
throw new BigQueryConnectException(
209-
"Failed to create table " + tableId, exception);
210-
}
211-
}
212216
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
<confluent.version>7.6.0</confluent.version>
4545
<debezium.version>0.6.2</debezium.version>
4646
<jackson.version>2.14.2</jackson.version>
47-
<jetty.version>9.4.53.v20231009</jetty.version>
47+
<jetty.version>9.4.56.v20240826</jetty.version>
4848
<kafka.version>3.9.1</kafka.version>
4949
<kafka.scala.version>2.12</kafka.scala.version>
5050
<slf4j.version>1.7.26</slf4j.version>

0 commit comments

Comments
 (0)