Skip to content

Commit 7c6f6b7

Browse files
committed
Error Handling Docs
- Native error handling in Pulsar - PulsarConsumerErrorHnadler in Spring Pulsar
1 parent 6d44744 commit 7c6f6b7

File tree

1 file changed

+327
-0
lines changed

1 file changed

+327
-0
lines changed

spring-pulsar-docs/src/main/asciidoc/pulsar.adoc

Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,333 @@ template.setSchema(JSONSchema.of(Foo.class));
827827

828828
TIP: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE. For KEY_VALUE schemata, only INLINE encoding is supported.
829829

830+
==== Message Redelivery and Error Handling
831+
832+
Now that we have seen both `PulsarListener` and the message listener container infrastructure, and its various functions, let us now try to understand message redelivery and error handling.
833+
Apache Pulsar provides various native strategies for message redelivery and error handling, and we are going to take a look at them first and see how we can leverage them through Spring for Apache Pulsar.
834+
835+
===== Specifying Acknowledgment Timeout for Message Redelivery
836+
837+
By default, Pulsar consumers will not redeliver messages unless the consumer crashes, but you can change this behavior by setting an ack timeout on the Pulsar consumer.
838+
When using Spring for Apache Pulsar, we can enable this property by setting the Boot property `spring.pulsar.consumer.ack-timeout-millis`.
839+
If this property has a value above zero, then if Pulsar consumer does not acknowledge a message within that timeout period, then the message will be redelivered.
840+
841+
You can also specify this property directly as a Pulsar consumer property on the `PulsarListener` itself as shown below:
842+
843+
====
844+
[source, java]
845+
----
846+
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
847+
properties = {"ackTimeoutMillis=60000"})
848+
public void listen(String s) {
849+
...
850+
}
851+
----
852+
====
853+
854+
When specifying `ackTimeoutMillis` as seen in the above `PulsarListener` method, then if the consumer does not send an acknowledgement within 60 seconds, the message will be redelivered by Pulsar to the consumer.
855+
856+
If you want to specify some advanced backoff options for ack timeout with different delays, then you can do the following:
857+
858+
====
859+
[source, java]
860+
----
861+
@EnablePulsar
862+
@Configuration
863+
static class AckTimeoutRedeliveryConfig {
864+
865+
@PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
866+
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
867+
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
868+
properties = { "ackTimeoutMillis=60000" })
869+
void listen(String msg) {
870+
// some long-running process that may cause an ack timeout
871+
}
872+
873+
@Bean
874+
public RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
875+
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
876+
.build();
877+
}
878+
879+
}
880+
----
881+
====
882+
883+
In the example above, we are specifying a bean for Pulsar's `RedeliveryBackoff` with a minimum delay of 1 second and a maximum delay of 10 seconds with a backoff multiplier of 2.
884+
After the initial ack timeout occurs, then the message redeliveries will be controlled through this backoff bean.
885+
We provide the backoff bean to the `PulsarListener` annotation by setting the `ackTimeoutRedeliveryBackoff` property to the actual bean name - `ackTimeoutRedeliveryBackoff` in this case.
886+
887+
===== Specifying Negative Acknowledgment Redelivery
888+
889+
When acknowledging negatively, Pulsar consumer allows you to specify how the application want the message to be re-delivered.
890+
The default is to redeliver the message in 1 minute, but you can change it by providing `spring.pulsar.consumer.negative-ack-redelivery-delay-micros`.
891+
You can also set it as a consumer property directly on `PulsarListener` as shown below:
892+
893+
====
894+
[source, java]
895+
----
896+
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
897+
properties = {"negativeAckRedeliveryDelayMicros=10000"})
898+
public void listen(String s) {
899+
...
900+
}
901+
----
902+
====
903+
904+
Here also, you can specify different delays and backoff mechanisms with a multiplier by providing a `RedeliveryBackoff` bean and provide the bean name as the `negativeAckRedeliveryBackoff` property on the PulsarProducer.
905+
Here is an example:
906+
907+
====
908+
[source, java]
909+
----
910+
@EnablePulsar
911+
@Configuration
912+
static class NegativeAckRedeliveryConfig {
913+
914+
@PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
915+
topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
916+
subscriptionType = SubscriptionType.Shared)
917+
void listen(String msg) {
918+
throw new RuntimeException("fail " + msg);
919+
}
920+
921+
@Bean
922+
public RedeliveryBackoff redeliveryBackoff() {
923+
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
924+
.build();
925+
}
926+
927+
}
928+
----
929+
====
930+
931+
===== Using Dead Letter Topic from Apache Pulsar for Message Redelivery and Error Handling
932+
933+
Apache Pulsar allows applications to use a dead letter topic on consumers with a `Shared` subscription type.
934+
For subscription types `Exclusive` and `Failover`, this feature is not available.
935+
The basic idea is that if a message is retried for a certain number of times, maybe due to an ack timeout or nack redelivery, and once the number of retries are exhausted, then the message can be sent to a special topic called DLQ.
936+
Let us see some details around this feature in action by inspecting some code snippets.
937+
938+
====
939+
[source, java]
940+
----
941+
@EnablePulsar
942+
@Configuration
943+
static class DeadLetterPolicyConfig {
944+
945+
@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
946+
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
947+
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1" })
948+
void listen(String msg) {
949+
throw new RuntimeException("fail " + msg);
950+
}
951+
952+
@PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
953+
void listenDlq(String msg) {
954+
System.out.println("From DLQ: " + msg);
955+
}
956+
957+
@Bean
958+
DeadLetterPolicy deadLetterPolicy() {
959+
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
960+
}
961+
962+
}
963+
----
964+
====
965+
966+
Let us go through some details.
967+
First, we have a special bean for `DeadLetterPolicy` and it's named as `deadLetterPolicy` (it acn be any name as you wish).
968+
This bean specifies a number of things, such as the max delivery - 10 in this case, and the name of the dead letter topic - `my-dlq-topic`.
969+
If you don't specify a DLQ topic name, then it defaults to `<topicname>-<subscriptionname>-DLQ` in Pulsar.
970+
Next, we provide this bean name to `PulsarListener` using the property `deadLetterPolicy`.
971+
Note that the `PulsarListener` has a subscription type of `Shared`, as the DLQ feature only works with shared subscriptions.
972+
This code is primarily for demonstration purposes, so we provide an `ackTimeoutMillis` value of 1 millisecond.
973+
The idea is that the code throws the exception and if Pulsar does not receive an ack within 1 millisecond, it does a retry.
974+
If that cycle continues for 10 times, (as that is our max redelivery count in the `DeadLetterPolicy`), then Pulsar consumer publishes the messages to the DQL topic.
975+
We have another `PulsarListener` that is listening on the DLQ topic to receive data as it is published to the DLQ topic.
976+
977+
**Special note on DLQ topics when using partitioned topics**: If the main topic is partitioned, then behind the scenes, each partition is treated as a separate topic by Pulsar.
978+
Pulsar appends `partition-<n>` where `n` stands for the partition number to the main topic name.
979+
The problem is that, if you do not specify a DLQ topic (as opposed to what we did above), then Pulsar will publish to a default topic name that has this ``partition-<n>` info in it - for ex: `topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ`.
980+
The easy way to solve this is to provide a DLQ topic name always.
981+
982+
===== Native Error Handling in Spring for Apache Pulsar
983+
984+
As we have noted above, the DLQ feature in Apache Pulsar only works for shared subscriptions.
985+
What does an application do if they need to use some similar feature for non-shared subscriptions?
986+
The main reason why Pulsar does not support DLQ on exclusive and failover subscriptions, is because those subscription types are order-guaranteed.
987+
By allowing redeliveries, DLQ etc. it effectively receives messages in out-of-order.
988+
But, what if some applications are okay with that, but more importantly needs this DLQ feature for non-shared subscriptions?
989+
For that, Spring for Apache Pulsar provides a `PulsarConsumerErrorHandler` which can be used across any subscription types in Pulsar - `Exclusive`, `Failover`, `Shared`, `Key_Shared`.
990+
991+
When using `PulsarConsumerErrorHandler` from Spring for Apache Pulsar, make sure not to set the ack timeout properties on the listener.
992+
993+
Let us see some details by examining a few code snippets.
994+
995+
====
996+
[source, java]
997+
----
998+
@EnablePulsar
999+
@Configuration
1000+
static class PulsarConsumerErrorHandlerConfig {
1001+
1002+
@Bean
1003+
public PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
1004+
PulsarTemplate<String> pulsarTemplate) {
1005+
return new DefaultPulsarConsumerErrorHandler<>(
1006+
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
1007+
}
1008+
1009+
@PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
1010+
topics = "pulsarConsumerErrorHandler-topic",
1011+
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
1012+
void listen(String msg) {
1013+
throw new RuntimeException("fail " + msg);
1014+
}
1015+
1016+
@PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
1017+
void listenDlt(String msg) {
1018+
System.out.println("From DLT: " + msg);
1019+
}
1020+
1021+
}
1022+
----
1023+
====
1024+
1025+
Let us take a look at the `pulsarConsumerErrorHandler` bean provided.
1026+
This creates a bean of type `PulsarConsumerErrorHandler` and uses the default implementation provided out of the box by Spring for Apache Pulsar - `DefaultPulsarConsumerErrorHandler`.
1027+
`DefaultPulsarConsumerErrorHandler` has a constructor that takes a `PulsarMessageRecovererFactory` and a `org.springframework.util.backoff.Backoff`.
1028+
`PulsarMessageRecovererFactory` is a functional interface with the following API:
1029+
1030+
====
1031+
[source, java]
1032+
----
1033+
@FunctionalInterface
1034+
public interface PulsarMessageRecovererFactory<T> {
1035+
1036+
/**
1037+
* Provides a message recoverer {@link PulsarMessageRecoverer}.
1038+
* @param consumer Pulsar consumer
1039+
* @return {@link PulsarMessageRecoverer}.
1040+
*/
1041+
PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);
1042+
1043+
}
1044+
1045+
----
1046+
====
1047+
1048+
The `recovererForConsumer` method takes a Pulsar consumer and returns a `PulsarMessageRecoverer` which is another functional interface.
1049+
Here is the API of `PulsarMessageRecoverer`:
1050+
1051+
====
1052+
[source, java]
1053+
----
1054+
public interface PulsarMessageRecoverer<T> {
1055+
1056+
/**
1057+
* Recover a failed message, for e.g. send the message to a DLT.
1058+
* @param message Pulsar message
1059+
* @param exception exception from failed message
1060+
*/
1061+
void recoverMessage(Message<T> message, Exception exception);
1062+
1063+
}
1064+
----
1065+
====
1066+
1067+
Spring for Apache Pulsar provides an implementation for `PulsarMessageRecovererFactory` called `PulsarDeadLetterPublishingRecoverer` that provides a default implementation that is capable of recovering the message by sending it to a DLT - (Dead Letter Topic).
1068+
This is the implementation that we are providing to the constructor for `DefaultPulsarConsumerErrorHandler` above.
1069+
As the second argument, we are providing a `FixedBackOff`.
1070+
You can also provide the `ExponentialBackoff` from Spring for advanced backoff features.
1071+
Then we provide this bean name for the `PulsarConsumerErrorHandler` as a property to the `PulsarListener`.
1072+
The property is called `pulsarConsumerErrorHandler`.
1073+
Each time the `PulsarListener` method fails for a message, it gets retried.
1074+
The number of retries are controlled by the `Backoff` implementation values provided - in our example, we do 10 retries - 11 total tries all in all - the first one and then the 10 retries.
1075+
Once all the retries are exhausted, the message is sent to the DLT topic.
1076+
1077+
The `PulsarDeadLetterPublishingRecoverer` implementation we provide use a `PulsarTemplate` that is uses for publishing the message to the DLT.
1078+
In most cases, the same auto-configured `PulsarTemplate` from Spring Boot is sufficient with the caveat for partitioned topics.
1079+
When using partitioned topics and using custom message routing for the main topic, you must use a different `PulsarTemplate` that does not take the autoconfigured `PulsarProducerFactory` that is populated with a value of `custompartition` for `message-routing-mode`.
1080+
Towards this extent, you can use a `PulsarConsumerErrorHandler` with the following blueprint.
1081+
1082+
====
1083+
[source, java]
1084+
----
1085+
@Bean
1086+
public PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
1087+
PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
1088+
PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
1089+
1090+
BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
1091+
(c, m) -> "my-foo-dlt";
1092+
1093+
final PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
1094+
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);
1095+
1096+
return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
1097+
new FixedBackOff(100, 5));
1098+
}
1099+
----
1100+
====
1101+
1102+
Note that, we are providing a destination resolver to the `PulsarDeadLetterPublishingRecoverer` as the second constructor argument.
1103+
If not provided, `PulsarDeadLetterPublishingRecoverer` will use `<subscription-name>-<topic-name>-DLT>` as the DLT topic name.
1104+
When using this feature, it is recommended to use a properr destination name by setting the destination resolver rather than using the default.
1105+
1106+
When using a single record message listener as we did above with `PulsarConsumerErrorHnadler` and if you are using manual acknowledgement, make sure not to negatively acknowledge the message when an exception is thrown.
1107+
Rather, just simply rethrow the exception back to the container; otherwise, the container thinks that the message is handled separately and the error handling will not be triggered.
1108+
1109+
Finally, we have a second `PulsarListener` above that is receiving messages from the DLT topic.
1110+
1111+
In the examples provided in this section so far, we only saw how to use `PulsarConsumerErrorHandler` with a single record message listener.
1112+
Next, we will look how can use this on batch listeners.
1113+
1114+
**Batch listener with PulsarConsumerErrorHandler**
1115+
1116+
First, let us look at a batch `PulsarListener` method.
1117+
1118+
====
1119+
[source, java]
1120+
----
1121+
@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
1122+
subscriptionType = SubscriptionType.Failover,
1123+
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
1124+
public void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
1125+
for (Message<Integer> datum : data) {
1126+
if (datum.getValue() == 5) {
1127+
throw new PulsarBatchListenerFailedException("failed", datum);
1128+
}
1129+
acknowledgement.acknowledge(datum.getMessageId());
1130+
}
1131+
}
1132+
1133+
@Bean
1134+
public PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
1135+
PulsarTemplate<String> pulsarTemplate) {
1136+
return new DefaultPulsarConsumerErrorHandler<>(
1137+
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
1138+
}
1139+
1140+
@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
1141+
void dltReceiver(Message<Integer> message) {
1142+
System.out.println("DLT - RECEIVED: " + message.getValue());
1143+
}
1144+
1145+
----
1146+
====
1147+
1148+
Once again, we re providing the property `pulsarConsumerErrorHandler` with the `PulsarConsumerErrorHandler` bean name.
1149+
When you are using a batch listener as above and want to use the `PulsarConsumerErrorHandler` from Spring for Apache Pulsar, then you need to use manual acknowledgment
1150+
This way you can acknowledge all the successful individual messages.
1151+
For the ones that fail, you must throw a `PulsarBatchListenerFailedException` with the message that it fails on.
1152+
Without this exception, the framework will not know what to do with the failure.
1153+
On retry, the container will send a new batch of messages, starting with the failed message to the listener.
1154+
If it fails again, it is retried, until the retries are exhausted, at which point the message will be sent to the DLT.
1155+
At that point, the message is acknowledged by the container and the listener will be handed over with the subsequent messages in the original batch.
1156+
8301157
==== Intercepting messages
8311158

8321159
===== Intercept messages on the Producer

0 commit comments

Comments
 (0)