Skip to content
This repository was archived by the owner on Dec 14, 2022. It is now read-only.

Commit 90d3022

Browse files
author
zouyunhe
committed
bug fix can not consume latest offset of specified subscription
1 parent 5410763 commit 90d3022

File tree

2 files changed

+9
-5
lines changed

2 files changed

+9
-5
lines changed

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,10 @@ public class FlinkPulsarSource<T> extends RichParallelSourceFunction<T>
163163
private String externalSubscriptionName;
164164

165165
/**
166-
* The subscription position to use when subscription does not exist (default is {@link
167-
* MessageId#latest}); Only relevant when startup mode is {@link
168-
* StartupMode#EXTERNAL_SUBSCRIPTION}.
166+
* The subscription position to use when subscription does not exist; Only relevant when startup
167+
* mode is {@link StartupMode#EXTERNAL_SUBSCRIPTION}.
169168
*/
170-
private MessageId subscriptionPosition = MessageId.latest;
169+
private MessageId subscriptionPosition;
171170

172171
// TODO: remove this when MessageId is serializable itself.
173172
// see: https://github.com/apache/pulsar/pull/6064

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarMetadataReader.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,9 @@ public MessageId getPositionFromSubscription(TopicRange topic, MessageId default
386386
String subscriptionName = subscriptionNameFrom(topic);
387387
TopicStats topicStats = admin.topics().getStats(topic.getTopic());
388388
if (topicStats.getSubscriptions().containsKey(subscriptionName)) {
389+
if (defaultPosition != null) {
390+
return defaultPosition;
391+
}
389392
SubscriptionStats subStats = topicStats.getSubscriptions().get(subscriptionName);
390393
if (subStats.getConsumers().size() != 0) {
391394
throw new IllegalStateException(
@@ -411,8 +414,10 @@ public MessageId getPositionFromSubscription(TopicRange topic, MessageId default
411414
}
412415
} else {
413416
// create sub on topic
417+
MessageId consumePosition =
418+
defaultPosition == null ? MessageId.latest : defaultPosition;
414419
admin.topics()
415-
.createSubscription(topic.getTopic(), subscriptionName, defaultPosition);
420+
.createSubscription(topic.getTopic(), subscriptionName, consumePosition);
416421
return defaultPosition;
417422
}
418423
} catch (PulsarAdminException | UnsupportedEncodingException e) {

0 commit comments

Comments
 (0)