Skip to content

Commit 2b987ed

Browse files
committed
Addressed comments
1 parent 0c728a5 commit 2b987ed

File tree

3 files changed

+40
-61
lines changed

3 files changed

+40
-61
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/RecordTableResolver.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,26 @@
1+
/*
2+
* Copyright 2024 Copyright 2022 Aiven Oy and
3+
* bigquery-connector-for-apache-kafka project contributors
4+
*
5+
* This software contains code derived from the Confluent BigQuery
6+
* Kafka Connector, Copyright Confluent, Inc, which in turn
7+
* contains code derived from the WePay BigQuery Kafka Connector,
8+
* Copyright WePay, Inc.
9+
*
10+
* Licensed under the Apache License, Version 2.0 (the "License");
11+
* you may not use this file except in compliance with the License.
12+
* You may obtain a copy of the License at
13+
*
14+
* http://www.apache.org/licenses/LICENSE-2.0
15+
*
16+
* Unless required by applicable law or agreed to in writing,
17+
* software distributed under the License is distributed on an
18+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
19+
* KIND, either express or implied. See the License for the
20+
* specific language governing permissions and limitations
21+
* under the License.
22+
*/
23+
124
package com.wepay.kafka.connect.bigquery;
225

326
import com.google.cloud.bigquery.BigQuery;
@@ -31,7 +54,6 @@ class RecordTableResolver {
3154
private final boolean usePartitionDecorator;
3255
private final boolean upsertDelete;
3356
private final boolean useMessageTimeDatePartitioning;
34-
private final boolean useStorageApi;
3557

3658
public RecordTableResolver(BigQuerySinkTaskConfig config, MergeBatches mergeBatches, BigQuery bigQuery) {
3759
this.config = config;
@@ -40,11 +62,10 @@ public RecordTableResolver(BigQuerySinkTaskConfig config, MergeBatches mergeBatc
4062

4163
this.useMessageTimeDatePartitioning =
4264
config.getBoolean(BigQuerySinkConfig.BIGQUERY_MESSAGE_TIME_PARTITIONING_CONFIG);
43-
this.usePartitionDecorator =
44-
config.getBoolean(BigQuerySinkConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG);
45-
this.useStorageApi =
46-
config.getBoolean(BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG);
47-
this.upsertDelete = !useStorageApi && (config.getBoolean(BigQuerySinkConfig.UPSERT_ENABLED_CONFIG)
65+
this.usePartitionDecorator = config.getBoolean(BigQuerySinkConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG)
66+
&& !config.getBoolean(BigQuerySinkConfig.ENABLE_BATCH_MODE_CONFIG);
67+
this.upsertDelete = !config.getBoolean(BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG)
68+
&& (config.getBoolean(BigQuerySinkConfig.UPSERT_ENABLED_CONFIG)
4869
|| config.getBoolean(BigQuerySinkConfig.DELETE_ENABLED_CONFIG));
4970
}
5071

@@ -74,9 +95,7 @@ private TableId getBaseTableId(String topic) {
7495
return topicToTableId.computeIfAbsent(topic, topicName -> {
7596
String[] datasetAndTable = TableNameUtils.getDataSetAndTableName(config, topic);
7697
String project = config.getString(BigQuerySinkConfig.PROJECT_CONFIG);
77-
TableId baseTableId = (!useStorageApi)
78-
? TableId.of(datasetAndTable[0], datasetAndTable[1])
79-
: TableId.of(project, datasetAndTable[0], datasetAndTable[1]);
98+
TableId baseTableId = TableId.of(project, datasetAndTable[0], datasetAndTable[1]);
8099

81100
if (usePartitionDecorator) {
82101
validatePartitioningForDecorator(baseTableId);
@@ -90,7 +109,7 @@ private void validatePartitioningForDecorator(TableId tableId) {
90109
StandardTableDefinition definition = retrieveTableDefinition(tableId);
91110
if (definition == null) {
92111
// If we could not find table and its definition, ignore.
93-
// Table creation will potentially be handled later via SchemaManager
112+
// Table creation will potentially be handled later if auto create is enabled
94113
return;
95114
}
96115
TimePartitioning partitioning = definition.getTimePartitioning();

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java

Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -823,47 +823,6 @@ public void testInterruptedException() {
823823
);
824824
}
825825

826-
@Test
827-
public void testTimePartitioningIncompatibleWithDecoratorSyntax() {
828-
final String topic = "t1";
829-
final String dataset = "d";
830-
831-
Map<String, String> properties = propertiesFactory.getProperties();
832-
properties.put(BigQuerySinkConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG, "true");
833-
properties.put(BigQuerySinkConfig.BIGQUERY_MESSAGE_TIME_PARTITIONING_CONFIG, "true");
834-
properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic);
835-
properties.put(BigQuerySinkConfig.DEFAULT_DATASET_CONFIG, dataset);
836-
837-
Storage storage = mock(Storage.class);
838-
BigQuery bigQuery = mock(BigQuery.class);
839-
840-
TableId tableId = TableId.of(dataset, topic);
841-
StandardTableDefinition mockTableDefinition = mock(StandardTableDefinition.class);
842-
when(mockTableDefinition.getTimePartitioning()).thenReturn(TimePartitioning.of(TimePartitioning.Type.HOUR));
843-
Table table = mock(Table.class);
844-
when(table.getDefinition()).thenReturn(mockTableDefinition);
845-
when(bigQuery.getTable(tableId)).thenReturn(table);
846-
847-
BigQuerySinkTask testTask = new BigQuerySinkTask(
848-
bigQuery,
849-
null,
850-
storage,
851-
null,
852-
mockedStorageWriteApiDefaultStream,
853-
mockedBatchHandler,
854-
time
855-
);
856-
857-
SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
858-
testTask.initialize(sinkTaskContext);
859-
testTask.start(properties);
860-
861-
assertThrows(
862-
ConnectException.class,
863-
() -> testTask.put(Collections.singleton(spoofSinkRecord(topic, "f1", "v1", TimestampType.CREATE_TIME, 1L)))
864-
);
865-
}
866-
867826
@Test
868827
public void testVersion() {
869828
assertNotNull(new BigQuerySinkTask().version());

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/RecordTableResolverTest.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class RecordTableResolverTest {
2626
private static final String TOPIC = "topic";
2727
private static final String DATASET = "dataset";
2828
private static final String PROJECT = "project";
29-
private static final TableId BASE_TABLE_ID = TableId.of("dataset", "topic");
29+
private static final TableId BASE_TABLE_ID = TableId.of("project", "dataset", "topic");
3030
private BigQuerySinkTaskConfig mockConfig;
3131
private BigQuery mockBigQuery;
3232
private MergeBatches mockMergeBatches;
@@ -59,7 +59,6 @@ public void testResolvesTable() {
5959
TableId tableId = recordTableResolver.getRecordTable(mockRecord).getFullTableId();
6060

6161
assertEquals(TOPIC, tableId.getTable());
62-
assertNull(tableId.getProject());
6362
}
6463

6564
@Test
@@ -77,28 +76,30 @@ public void testUpsertDeleteResolvesIntermediateTable() {
7776
}
7877

7978
@Test
80-
public void testResolveTableWithStorageApi() {
79+
public void testResolveTableWithStorageApiIgnoresUpsertDelete() {
80+
when(mockConfig.getBoolean(BigQuerySinkConfig.UPSERT_ENABLED_CONFIG)).thenReturn(true);
8181
when(mockConfig.getBoolean(BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG)).thenReturn(true);
8282

83+
String intermediateTable = "intermediate_topic";
84+
TableId intermediateTableId = TableId.of(DATASET, intermediateTable);
85+
when(mockMergeBatches.intermediateTableFor(BASE_TABLE_ID)).thenReturn(intermediateTableId);
86+
8387
recordTableResolver = new RecordTableResolver(mockConfig, mockMergeBatches, mockBigQuery);
8488
TableId tableId = recordTableResolver.getRecordTable(mockRecord).getFullTableId();
8589

8690
assertEquals(TOPIC, tableId.getTable());
87-
assertEquals(PROJECT, tableId.getProject());
8891
}
8992

9093
@Test
91-
public void testResolveTableWithStorageApiIgnoresUpsertDelete() {
92-
when(mockConfig.getBoolean(BigQuerySinkConfig.UPSERT_ENABLED_CONFIG)).thenReturn(true);
94+
public void testPartitionDecoratorIgnoredWhenBatchModeEnabled() {
95+
when(mockConfig.getBoolean(BigQuerySinkConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG)).thenReturn(true);
96+
when(mockConfig.getBoolean(BigQuerySinkConfig.ENABLE_BATCH_MODE_CONFIG)).thenReturn(true);
9397
when(mockConfig.getBoolean(BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG)).thenReturn(true);
9498

95-
String intermediateTable = "intermediate_topic";
96-
TableId intermediateTableId = TableId.of(DATASET, intermediateTable);
97-
when(mockMergeBatches.intermediateTableFor(BASE_TABLE_ID)).thenReturn(intermediateTableId);
99+
mockTableWithPartitioning(null);
98100

99101
recordTableResolver = new RecordTableResolver(mockConfig, mockMergeBatches, mockBigQuery);
100102
TableId tableId = recordTableResolver.getRecordTable(mockRecord).getFullTableId();
101-
102103
assertEquals(TOPIC, tableId.getTable());
103104
}
104105

0 commit comments

Comments
 (0)