diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java index 44eb5f1d..11e3bcbc 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java @@ -163,11 +163,10 @@ public class FlinkPulsarSource extends RichParallelSourceFunction private String externalSubscriptionName; /** - * The subscription position to use when subscription does not exist (default is {@link - * MessageId#latest}); Only relevant when startup mode is {@link - * StartupMode#EXTERNAL_SUBSCRIPTION}. + * The subscription position to use when subscription does not exist; Only relevant when startup + * mode is {@link StartupMode#EXTERNAL_SUBSCRIPTION}. */ - private MessageId subscriptionPosition = MessageId.latest; + private MessageId subscriptionPosition; // TODO: remove this when MessageId is serializable itself. // see: https://github.com/apache/pulsar/pull/6064 diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarMetadataReader.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarMetadataReader.java index 4cf5790f..a03f8962 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarMetadataReader.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarMetadataReader.java @@ -386,6 +386,9 @@ public MessageId getPositionFromSubscription(TopicRange topic, MessageId default String subscriptionName = subscriptionNameFrom(topic); TopicStats topicStats = admin.topics().getStats(topic.getTopic()); if (topicStats.getSubscriptions().containsKey(subscriptionName)) { + if (defaultPosition != null) { + return defaultPosition; + } SubscriptionStats subStats = topicStats.getSubscriptions().get(subscriptionName); if (subStats.getConsumers().size() != 0) { throw new IllegalStateException( @@ -411,8 +414,10 @@ public MessageId getPositionFromSubscription(TopicRange topic, MessageId default } } else { // create sub on topic + MessageId consumePosition = + defaultPosition == null ? MessageId.latest : defaultPosition; admin.topics() - .createSubscription(topic.getTopic(), subscriptionName, defaultPosition); + .createSubscription(topic.getTopic(), subscriptionName, consumePosition); return defaultPosition; } } catch (PulsarAdminException | UnsupportedEncodingException e) {