Skip to content
This repository was archived by the owner on Dec 14, 2022. It is now read-only.

Commit 1f450a8

Browse files
author
高章敏
committed
external-sub support consuming from earliest offset
1 parent 66ccaf1 commit 1f450a8

File tree

3 files changed

+16
-3
lines changed

3 files changed

+16
-3
lines changed

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_MODE;
5858
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
5959
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_SUB_NAME;
60+
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_SUB_START_OFFSET;
6061
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SERVICE_URL;
6162
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SINK_MESSAGE_ROUTER;
6263
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SINK_SEMANTIC;
@@ -240,6 +241,7 @@ public Set<ConfigOption<?>> optionalOptions() {
240241
options.add(SCAN_STARTUP_MODE);
241242
options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
242243
options.add(SCAN_STARTUP_SUB_NAME);
244+
options.add(SCAN_STARTUP_SUB_START_OFFSET);
243245

244246
options.add(PARTITION_DISCOVERY_INTERVAL_MILLIS);
245247
options.add(SINK_SEMANTIC);

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
249249
break;
250250
case EXTERNAL_SUBSCRIPTION:
251251
MessageId subscriptionPosition = MessageId.latest;
252-
if (CONNECTOR_STARTUP_MODE_VALUE_EARLIEST
253-
.equals(properties.get(CONNECTOR_EXTERNAL_SUB_DEFAULT_OFFSET))) {
252+
if (CONNECTOR_STARTUP_MODE_VALUE_EARLIEST.equals(startupOptions.externalSubStartOffset)) {
254253
subscriptionPosition = MessageId.earliest;
255254
}
256255
source.setStartFromSubscription(startupOptions.externalSubscriptionName, subscriptionPosition);

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,13 @@ private PulsarTableOptions() {
170170
.key("scan.startup.sub-name")
171171
.stringType()
172172
.noDefaultValue()
173-
.withDescription("Optional sub-name used in case of \"specific-offsets\" startup mode");
173+
.withDescription("Optional sub-name used in case of \"external-subscription\" startup mode");
174+
175+
public static final ConfigOption<String> SCAN_STARTUP_SUB_START_OFFSET = ConfigOptions
176+
.key("scan.startup.sub-startOffset")
177+
.stringType()
178+
.defaultValue("latest")
179+
.withDescription("Optional sub-startOffset used in case of \"external-subscription\" startup mode");
174180

175181
public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions
176182
.key("scan.startup.timestamp-millis")
@@ -446,6 +452,7 @@ public static StartupOptions getStartupOptions(
446452

447453
final Map<String, MessageId> specificOffsets = new HashMap<>();
448454
final List<String> subName = new ArrayList<>(1);
455+
final List<String> subStartOffset = new ArrayList<>(1);
449456
final StartupMode startupMode = tableOptions.getOptional(SCAN_STARTUP_MODE)
450457
.map(modeString -> {
451458
switch (modeString) {
@@ -476,6 +483,7 @@ public static StartupOptions getStartupOptions(
476483

477484
case PulsarValidator.CONNECTOR_STARTUP_MODE_VALUE_EXTERNAL_SUB:
478485
subName.add(tableOptions.get(SCAN_STARTUP_SUB_NAME));
486+
subStartOffset.add(tableOptions.get(SCAN_STARTUP_SUB_START_OFFSET));
479487
return StartupMode.EXTERNAL_SUBSCRIPTION;
480488

481489
default:
@@ -488,6 +496,9 @@ public static StartupOptions getStartupOptions(
488496
if (subName.size() != 0) {
489497
options.externalSubscriptionName = subName.get(0);
490498
}
499+
if (subStartOffset.size() != 0) {
500+
options.externalSubStartOffset = subStartOffset.get(0);
501+
}
491502
return options;
492503

493504
}
@@ -680,6 +691,7 @@ public static class StartupOptions {
680691
public StartupMode startupMode;
681692
public Map<String, MessageId> specificOffsets;
682693
public String externalSubscriptionName;
694+
public String externalSubStartOffset;
683695
}
684696

685697
/**

0 commit comments

Comments
 (0)