Skip to content

Commit 0cce961

Browse files
committed
Merge branch 'main' into storage-write-api-enable-partition-decorator
2 parents 1b4593f + 0142bda commit 0cce961

File tree

3 files changed

+107
-3
lines changed

3 files changed

+107
-3
lines changed

.github/workflows/nightly.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ on:
2828
workflow_dispatch:
2929
schedule: ## run GMT 1:17 hours
3030
- cron: '17 1 * * *'
31+
workflow_call:
32+
secrets:
33+
GCP_CREDENTIALS:
34+
KCBQ_TEST_PROJECT:
35+
KCBQ_TEST_DATASET:
36+
KCBQ_TEST_BUCKET:
3137

3238
permissions:
3339
contents: write
@@ -44,3 +50,8 @@ permissions:
4450
jobs:
4551
call-workflow-2-in-local-repo:
4652
uses: ./.github/workflows/manual.yml
53+
secrets:
54+
GCP_CREDENTIALS:
55+
KCBQ_TEST_PROJECT:
56+
KCBQ_TEST_DATASET:
57+
KCBQ_TEST_BUCKET:

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/KafkaDataBuilder.java

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,58 @@
2626

2727
import com.google.cloud.bigquery.Field;
2828
import com.google.cloud.bigquery.LegacySQLTypeName;
29+
import java.lang.invoke.MethodHandles;
30+
import java.lang.invoke.MethodType;
2931
import java.util.HashMap;
3032
import java.util.Map;
3133
import org.apache.kafka.connect.sink.SinkRecord;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3236

3337
/**
3438
* Helper class to construct schema and record for Kafka Data Field.
3539
*/
3640
public class KafkaDataBuilder {
3741

42+
private static final Logger logger = LoggerFactory.getLogger(KafkaDataBuilder.class);
43+
3844
public static final String KAFKA_DATA_TOPIC_FIELD_NAME = "topic";
3945
public static final String KAFKA_DATA_PARTITION_FIELD_NAME = "partition";
4046
public static final String KAFKA_DATA_OFFSET_FIELD_NAME = "offset";
4147
public static final String KAFKA_DATA_INSERT_TIME_FIELD_NAME = "insertTime";
4248

49+
// This is a marker variable for methods necessary to keep original sink record metadata.
50+
// These methods in SinkRecord class are available only since Kafka Connect API version 3.6.
51+
private static final boolean KAFKA_CONNECT_API_POST_3_6;
52+
53+
static {
54+
boolean kafkaConnectApiPost36;
55+
try {
56+
MethodHandles.lookup().findVirtual(
57+
SinkRecord.class,
58+
"originalTopic",
59+
MethodType.methodType(String.class)
60+
);
61+
MethodHandles.lookup().findVirtual(
62+
SinkRecord.class,
63+
"originalKafkaPartition",
64+
MethodType.methodType(Integer.class)
65+
);
66+
MethodHandles.lookup().findVirtual(
67+
SinkRecord.class,
68+
"originalKafkaOffset",
69+
MethodType.methodType(long.class)
70+
);
71+
kafkaConnectApiPost36 = true;
72+
} catch (NoSuchMethodException | IllegalAccessException e) {
73+
logger.warn("This connector cannot retain original topic/partition/offset fields in SinkRecord. "
74+
+ "If these fields are mutated in upstream SMTs, they will be lost. "
75+
+ "Upgrade to Kafka Connect 3.6 to provision reliable metadata into resulting table.", e);
76+
kafkaConnectApiPost36 = false;
77+
}
78+
KAFKA_CONNECT_API_POST_3_6 = kafkaConnectApiPost36;
79+
}
80+
4381
/**
4482
* Construct schema for Kafka Data Field
4583
*
@@ -60,6 +98,30 @@ public static Field buildKafkaDataField(String kafkaDataFieldName) {
6098
.setMode(com.google.cloud.bigquery.Field.Mode.NULLABLE).build();
6199
}
62100

101+
private static String tryGetOriginalTopic(SinkRecord kafkaConnectRecord) {
102+
if (KAFKA_CONNECT_API_POST_3_6) {
103+
return kafkaConnectRecord.originalTopic();
104+
} else {
105+
return kafkaConnectRecord.topic();
106+
}
107+
}
108+
109+
private static Integer tryGetOriginalKafkaPartition(SinkRecord kafkaConnectRecord) {
110+
if (KAFKA_CONNECT_API_POST_3_6) {
111+
return kafkaConnectRecord.originalKafkaPartition();
112+
} else {
113+
return kafkaConnectRecord.kafkaPartition();
114+
}
115+
}
116+
117+
private static long tryGetOriginalKafkaOffset(SinkRecord kafkaConnectRecord) {
118+
if (KAFKA_CONNECT_API_POST_3_6) {
119+
return kafkaConnectRecord.originalKafkaOffset();
120+
} else {
121+
return kafkaConnectRecord.kafkaOffset();
122+
}
123+
}
124+
63125
/**
64126
* Construct a map of Kafka Data record
65127
*
@@ -68,9 +130,9 @@ public static Field buildKafkaDataField(String kafkaDataFieldName) {
68130
*/
69131
public static Map<String, Object> buildKafkaDataRecord(SinkRecord kafkaConnectRecord) {
70132
HashMap<String, Object> kafkaData = new HashMap<>();
71-
kafkaData.put(KAFKA_DATA_TOPIC_FIELD_NAME, kafkaConnectRecord.topic());
72-
kafkaData.put(KAFKA_DATA_PARTITION_FIELD_NAME, kafkaConnectRecord.kafkaPartition());
73-
kafkaData.put(KAFKA_DATA_OFFSET_FIELD_NAME, kafkaConnectRecord.kafkaOffset());
133+
kafkaData.put(KAFKA_DATA_TOPIC_FIELD_NAME, tryGetOriginalTopic(kafkaConnectRecord));
134+
kafkaData.put(KAFKA_DATA_PARTITION_FIELD_NAME, tryGetOriginalKafkaPartition(kafkaConnectRecord));
135+
kafkaData.put(KAFKA_DATA_OFFSET_FIELD_NAME, tryGetOriginalKafkaOffset(kafkaConnectRecord));
74136
kafkaData.put(KAFKA_DATA_INSERT_TIME_FIELD_NAME, System.currentTimeMillis() / 1000.0);
75137
return kafkaData;
76138
}

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/KafkaDataConverterTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ public class KafkaDataConverterTest {
4545
private static final String kafkaDataTopicValue = "testTopic";
4646
private static final int kafkaDataPartitionValue = 101;
4747
private static final long kafkaDataOffsetValue = 1337;
48+
private static final String kafkaDataMutatedTopicValue = "mutatedTopic";
49+
private static final int kafkaDataMutatedPartitionValue = 201;
50+
// In 3.6.1, there is no direct way to modify offset via newRecord(), even if SinkRecord itself supports it
51+
private static final long kafkaDataMutatedOffsetValue = 456;
4852
Map<String, Object> expectedKafkaDataFields = new HashMap<>();
4953

5054
@BeforeEach
@@ -67,6 +71,33 @@ public void testBuildKafkaDataRecord() {
6771
assertEquals(expectedKafkaDataFields, actualKafkaDataFields);
6872
}
6973

74+
@Test
75+
public void testBuildKafkaDataRecordOnMutatedMetadata() {
76+
SinkRecord record = new SinkRecord(
77+
kafkaDataTopicValue,
78+
kafkaDataPartitionValue,
79+
null,
80+
null,
81+
null,
82+
null,
83+
kafkaDataOffsetValue
84+
);
85+
SinkRecord mutatedRecord = record.newRecord(
86+
kafkaDataMutatedTopicValue,
87+
kafkaDataMutatedPartitionValue,
88+
null,
89+
null,
90+
null,
91+
null,
92+
null
93+
);
94+
95+
Map<String, Object> actualKafkaDataFields = KafkaDataBuilder.buildKafkaDataRecord(mutatedRecord);
96+
actualKafkaDataFields.remove(kafkaDataInsertTimeName);
97+
98+
assertEquals(expectedKafkaDataFields, actualKafkaDataFields);
99+
}
100+
70101
@Test
71102
public void testBuildKafkaDataRecordStorageWriteApi() {
72103
SinkRecord record = new SinkRecord(kafkaDataTopicValue, kafkaDataPartitionValue, null, null, null, null, kafkaDataOffsetValue);

0 commit comments

Comments
 (0)