Skip to content
This repository was archived by the owner on Dec 14, 2022. It is now read-only.

Commit 42c94cf

Browse files
author
高章敏
committed
fix config name to scan.startup.sub-start-offset
1 parent 1f450a8 commit 42c94cf

File tree

2 files changed

+42
-54
lines changed

2 files changed

+42
-54
lines changed

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
189189
Properties properties = getPulsarProperties(context.getCatalogTable().toProperties());
190190

191191
final PulsarTableOptions.StartupOptions startupOptions = PulsarTableOptions
192-
.getStartupOptions(tableOptions, topics);
192+
.getStartupOptions(tableOptions);
193193

194194
final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
195195

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions.java

Lines changed: 41 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,10 @@ private PulsarTableOptions() {
173173
.withDescription("Optional sub-name used in case of \"external-subscription\" startup mode");
174174

175175
public static final ConfigOption<String> SCAN_STARTUP_SUB_START_OFFSET = ConfigOptions
176-
.key("scan.startup.sub-startOffset")
176+
.key("scan.startup.sub-start-offset")
177177
.stringType()
178178
.defaultValue("latest")
179-
.withDescription("Optional sub-startOffset used in case of \"external-subscription\" startup mode");
179+
.withDescription("Optional sub-start-offset used in case of \"external-subscription\" startup mode");
180180

181181
public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions
182182
.key("scan.startup.timestamp-millis")
@@ -446,58 +446,46 @@ private static boolean hasPulsarClientProperties(Map<String, String> tableOption
446446
return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
447447
}
448448

449-
public static StartupOptions getStartupOptions(
450-
ReadableConfig tableOptions,
451-
List<String> topics) {
452-
453-
final Map<String, MessageId> specificOffsets = new HashMap<>();
454-
final List<String> subName = new ArrayList<>(1);
455-
final List<String> subStartOffset = new ArrayList<>(1);
456-
final StartupMode startupMode = tableOptions.getOptional(SCAN_STARTUP_MODE)
457-
.map(modeString -> {
458-
switch (modeString) {
459-
case PulsarValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
460-
return StartupMode.EARLIEST;
461-
462-
case PulsarValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
463-
return StartupMode.LATEST;
464-
465-
case PulsarValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
466-
String specificOffsetsStrOpt = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
467-
468-
final Map<Integer, String> offsetList = parseSpecificOffsets(
469-
specificOffsetsStrOpt,
470-
SCAN_STARTUP_SPECIFIC_OFFSETS.key());
471-
offsetList.forEach((partition, offset) -> {
472-
try {
473-
474-
final MessageIdImpl messageId = parseMessageId(offset);
475-
specificOffsets.put(partition.toString(), messageId);
476-
} catch (Exception e) {
477-
log.error("Failed to decode message id from properties {}",
478-
ExceptionUtils.stringifyException(e));
479-
throw new RuntimeException(e);
480-
}
481-
});
482-
return StartupMode.SPECIFIC_OFFSETS;
483-
484-
case PulsarValidator.CONNECTOR_STARTUP_MODE_VALUE_EXTERNAL_SUB:
485-
subName.add(tableOptions.get(SCAN_STARTUP_SUB_NAME));
486-
subStartOffset.add(tableOptions.get(SCAN_STARTUP_SUB_START_OFFSET));
487-
return StartupMode.EXTERNAL_SUBSCRIPTION;
488-
489-
default:
490-
throw new TableException("Unsupported startup mode. Validator should have checked that.");
491-
}
492-
}).orElse(StartupMode.LATEST);
449+
public static StartupOptions getStartupOptions(ReadableConfig tableOptions) {
493450
final StartupOptions options = new StartupOptions();
494-
options.startupMode = startupMode;
495-
options.specificOffsets = specificOffsets;
496-
if (subName.size() != 0) {
497-
options.externalSubscriptionName = subName.get(0);
498-
}
499-
if (subStartOffset.size() != 0) {
500-
options.externalSubStartOffset = subStartOffset.get(0);
451+
final Map<String, MessageId> specificOffsets = new HashMap<>();
452+
Optional<String> modeString = tableOptions.getOptional(SCAN_STARTUP_MODE);
453+
if(!modeString.isPresent()) {
454+
options.startupMode = StartupMode.LATEST;
455+
} else {
456+
switch (modeString.get()) {
457+
case PulsarValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
458+
options.startupMode = StartupMode.EARLIEST;
459+
460+
case PulsarValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
461+
options.startupMode = StartupMode.LATEST;
462+
463+
case PulsarValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
464+
String specificOffsetsStrOpt = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
465+
466+
final Map<Integer, String> offsetList = parseSpecificOffsets(
467+
specificOffsetsStrOpt,
468+
SCAN_STARTUP_SPECIFIC_OFFSETS.key());
469+
offsetList.forEach((partition, offset) -> {
470+
try {
471+
final MessageIdImpl messageId = parseMessageId(offset);
472+
specificOffsets.put(partition.toString(), messageId);
473+
} catch (Exception e) {
474+
log.error("Failed to decode message id from properties {}",
475+
ExceptionUtils.stringifyException(e));
476+
throw new RuntimeException(e);
477+
}
478+
});
479+
options.startupMode = StartupMode.SPECIFIC_OFFSETS;
480+
options.specificOffsets = specificOffsets;
481+
482+
case PulsarValidator.CONNECTOR_STARTUP_MODE_VALUE_EXTERNAL_SUB:
483+
options.externalSubscriptionName = tableOptions.get(SCAN_STARTUP_SUB_NAME);
484+
options.externalSubStartOffset = tableOptions.get(SCAN_STARTUP_SUB_START_OFFSET);
485+
options.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
486+
default:
487+
throw new TableException("Unsupported startup mode. Validator should have checked that.");
488+
}
501489
}
502490
return options;
503491

0 commit comments

Comments
 (0)