Skip to content
This repository was archived by the owner on Dec 14, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,10 @@ public class FlinkPulsarSource<T> extends RichParallelSourceFunction<T>
private String externalSubscriptionName;

/**
* The subscription position to use when subscription does not exist (default is {@link
* MessageId#latest}); Only relevant when startup mode is {@link
* StartupMode#EXTERNAL_SUBSCRIPTION}.
* The subscription position to use when subscription does not exist; Only relevant when startup
* mode is {@link StartupMode#EXTERNAL_SUBSCRIPTION}.
*/
private MessageId subscriptionPosition = MessageId.latest;
private MessageId subscriptionPosition;

// TODO: remove this when MessageId is serializable itself.
// see: https://github.com/apache/pulsar/pull/6064
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,9 @@ public MessageId getPositionFromSubscription(TopicRange topic, MessageId default
String subscriptionName = subscriptionNameFrom(topic);
TopicStats topicStats = admin.topics().getStats(topic.getTopic());
if (topicStats.getSubscriptions().containsKey(subscriptionName)) {
if (defaultPosition != null) {
return defaultPosition;
}
SubscriptionStats subStats = topicStats.getSubscriptions().get(subscriptionName);
if (subStats.getConsumers().size() != 0) {
throw new IllegalStateException(
Expand All @@ -411,8 +414,10 @@ public MessageId getPositionFromSubscription(TopicRange topic, MessageId default
}
} else {
// create sub on topic
MessageId consumePosition =
defaultPosition == null ? MessageId.latest : defaultPosition;
admin.topics()
.createSubscription(topic.getTopic(), subscriptionName, defaultPosition);
.createSubscription(topic.getTopic(), subscriptionName, consumePosition);
return defaultPosition;
}
} catch (PulsarAdminException | UnsupportedEncodingException e) {
Expand Down