5858import org .apache .flink .streaming .connectors .pulsar .internal .TopicSubscription ;
5959import org .apache .flink .streaming .connectors .pulsar .internal .TopicSubscriptionSerializer ;
6060import org .apache .flink .streaming .connectors .pulsar .serialization .PulsarDeserializationSchema ;
61+ import org .apache .flink .streaming .connectors .pulsar .table .PulsarTableOptions ;
6162import org .apache .flink .streaming .runtime .operators .util .AssignerWithPeriodicWatermarksAdapter ;
6263import org .apache .flink .streaming .runtime .operators .util .AssignerWithPunctuatedWatermarksAdapter ;
6364import org .apache .flink .streaming .runtime .tasks .ProcessingTimeService ;
6465import org .apache .flink .util .ExceptionUtils ;
6566import org .apache .flink .util .SerializedValue ;
67+ import org .apache .flink .util .TimeUtils ;
6668
6769import org .apache .flink .shaded .guava18 .com .google .common .collect .Sets ;
6870
7375import org .apache .pulsar .shade .com .google .common .collect .Maps ;
7476import org .apache .pulsar .shade .org .apache .commons .lang3 .StringUtils ;
7577
78+ import java .time .Duration ;
7679import java .util .ArrayList ;
7780import java .util .HashMap ;
7881import java .util .HashSet ;
8588import java .util .Set ;
8689import java .util .TreeMap ;
8790import java .util .UUID ;
91+ import java .util .concurrent .Executors ;
92+ import java .util .concurrent .ScheduledExecutorService ;
93+ import java .util .concurrent .TimeUnit ;
8894import java .util .concurrent .atomic .AtomicReference ;
8995import java .util .stream .Collectors ;
9096
@@ -238,6 +244,12 @@ public class FlinkPulsarSource<T> extends RichParallelSourceFunction<T>
238244
239245 private long startupOffsetsTimestamp = -1L ;
240246
247+ private boolean enableOffsetAutoCommit ;
248+
249+ private Duration offsetAutoCommitInterval ;
250+
251+ private ScheduledExecutorService offsetCommitScheduler ;
252+
241253 public FlinkPulsarSource (
242254 String adminUrl ,
243255 ClientConfigurationData clientConf ,
@@ -264,6 +276,23 @@ public FlinkPulsarSource(
264276 }
265277 this .oldStateVersion =
266278 SourceSinkUtils .getOldStateVersion (caseInsensitiveParams , oldStateVersion );
279+
280+ this .enableOffsetAutoCommit =
281+ Boolean .parseBoolean (
282+ properties
283+ .getOrDefault (
284+ PulsarTableOptions .ENABLE_OFFSET_AUTO_COMMIT .key (), "true" )
285+ .toString ());
286+ if (enableOffsetAutoCommit ) {
287+ log .info ("enable offset auto commit." );
288+ this .offsetAutoCommitInterval =
289+ TimeUtils .parseDuration (
290+ properties
291+ .getOrDefault (
292+ PulsarTableOptions .OFFSET_AUTO_COMMIT_INTERVAL .key (),
293+ "60 s" )
294+ .toString ());
295+ }
267296 }
268297
269298 public FlinkPulsarSource (
@@ -551,6 +580,28 @@ public void open(Configuration parameters) throws Exception {
551580 ownedTopicStarts );
552581 }
553582 }
583+
584+ if (!((StreamingRuntimeContext ) getRuntimeContext ()).isCheckpointingEnabled ()
585+ && enableOffsetAutoCommit ) {
586+ this .offsetCommitScheduler = Executors .newScheduledThreadPool (1 );
587+ this .offsetCommitScheduler .scheduleAtFixedRate (
588+ () -> {
589+ if (pulsarFetcher != null ) {
590+ Map <TopicRange , MessageId > consumedOffsets =
591+ pulsarFetcher .snapshotCurrentState ();
592+ try {
593+ log .info ("commit offset to pulsar cluster." );
594+ pulsarFetcher .commitOffsetToPulsar (
595+ consumedOffsets , offsetCommitCallback );
596+ } catch (InterruptedException e ) {
597+
598+ }
599+ }
600+ },
601+ 0 ,
602+ offsetAutoCommitInterval != null ? offsetAutoCommitInterval .getSeconds () : 60 ,
603+ TimeUnit .SECONDS );
604+ }
554605 }
555606
556607 protected String getSubscriptionName () {
@@ -742,6 +793,10 @@ public void close() throws Exception {
742793 }
743794 }
744795
796+ if (offsetCommitScheduler != null ) {
797+ offsetCommitScheduler .shutdown ();
798+ }
799+
745800 try {
746801 super .close ();
747802 } catch (Exception e ) {
0 commit comments