@@ -860,7 +860,7 @@ If you want to specify some advanced backoff options for ack timeout with differ
860860----
861861@EnablePulsar
862862@Configuration
863- static class AckTimeoutRedeliveryConfig {
863+ class AckTimeoutRedeliveryConfig {
864864
865865 @PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
866866 topics = "withAckTimeoutRedeliveryBackoff-test-topic",
@@ -871,7 +871,7 @@ static class AckTimeoutRedeliveryConfig {
871871 }
872872
873873 @Bean
874- public RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
874+ RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
875875 return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
876876 .build();
877877 }
@@ -909,7 +909,7 @@ Here is an example:
909909----
910910@EnablePulsar
911911@Configuration
912- static class NegativeAckRedeliveryConfig {
912+ class NegativeAckRedeliveryConfig {
913913
914914 @PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
915915 topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
@@ -919,7 +919,7 @@ static class NegativeAckRedeliveryConfig {
919919 }
920920
921921 @Bean
922- public RedeliveryBackoff redeliveryBackoff() {
922+ RedeliveryBackoff redeliveryBackoff() {
923923 return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
924924 .build();
925925 }
@@ -940,7 +940,7 @@ Let us see some details around this feature in action by inspecting some code sn
940940----
941941@EnablePulsar
942942@Configuration
943- static class DeadLetterPolicyConfig {
943+ class DeadLetterPolicyConfig {
944944
945945 @PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
946946 topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
@@ -997,10 +997,10 @@ Let us see some details by examining a few code snippets.
997997----
998998@EnablePulsar
999999@Configuration
1000- static class PulsarConsumerErrorHandlerConfig {
1000+ class PulsarConsumerErrorHandlerConfig {
10011001
1002- @Bean
1003- public PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
1002+ @Bean
1003+ PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
10041004 PulsarTemplate<String> pulsarTemplate) {
10051005 return new DefaultPulsarConsumerErrorHandler<>(
10061006 new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
@@ -1083,18 +1083,18 @@ Towards this extent, you can use a `PulsarConsumerErrorHandler` with the followi
10831083[source, java]
10841084----
10851085@Bean
1086- public PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
1087- PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
1088- PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
1086+ PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
1087+ PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
1088+ PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
10891089
1090- BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
1091- (c, m) -> "my-foo-dlt";
1090+ BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
1091+ (c, m) -> "my-foo-dlt";
10921092
1093- final PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
1094- new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);
1093+ PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
1094+ new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);
10951095
1096- return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
1097- new FixedBackOff(100, 5));
1096+ return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
1097+ new FixedBackOff(100, 5));
10981098}
10991099----
11001100====
@@ -1121,17 +1121,17 @@ First, let us look at a batch `PulsarListener` method.
11211121@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
11221122 subscriptionType = SubscriptionType.Failover,
11231123 pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
1124- public void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
1124+ void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
11251125 for (Message<Integer> datum : data) {
1126- if (datum.getValue() == 5) {
1126+ if (datum.getValue() == 5) {
11271127 throw new PulsarBatchListenerFailedException("failed", datum);
1128- }
1129- acknowledgement.acknowledge(datum.getMessageId());
1130- }
1128+ }
1129+ acknowledgement.acknowledge(datum.getMessageId());
1130+ }
11311131}
11321132
11331133@Bean
1134- public PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
1134+ PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
11351135 PulsarTemplate<String> pulsarTemplate) {
11361136 return new DefaultPulsarConsumerErrorHandler<>(
11371137 new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
0 commit comments