From be2b36cff00c44ece576c565f0fb13ece32d5b75 Mon Sep 17 00:00:00 2001 From: majusko Date: Wed, 18 Jan 2023 08:58:01 +0100 Subject: [PATCH] Check for message id and not message. --- .../io/github/majusko/pulsar/consumer/ConsumerAggregator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/github/majusko/pulsar/consumer/ConsumerAggregator.java b/src/main/java/io/github/majusko/pulsar/consumer/ConsumerAggregator.java index cb11698..7d5d412 100644 --- a/src/main/java/io/github/majusko/pulsar/consumer/ConsumerAggregator.java +++ b/src/main/java/io/github/majusko/pulsar/consumer/ConsumerAggregator.java @@ -177,7 +177,7 @@ private void createBatchListener(ConsumerHolder holder, final Consumer consum final Set ackSet = ackList.stream().collect(Collectors.toSet()); consumer.acknowledge(ackList); msgs.forEach((msg) -> { - if (!ackSet.contains(msg)) + if (!ackSet.contains(msg.getMessageId())) consumer.negativeAcknowledge(msg); }); } else if (!manualAckMode) {