diff --git a/src/main/java/io/github/majusko/pulsar/consumer/ConsumerAggregator.java b/src/main/java/io/github/majusko/pulsar/consumer/ConsumerAggregator.java index cb11698..7d5d412 100644 --- a/src/main/java/io/github/majusko/pulsar/consumer/ConsumerAggregator.java +++ b/src/main/java/io/github/majusko/pulsar/consumer/ConsumerAggregator.java @@ -177,7 +177,7 @@ private void createBatchListener(ConsumerHolder holder, final Consumer consum final Set ackSet = ackList.stream().collect(Collectors.toSet()); consumer.acknowledge(ackList); msgs.forEach((msg) -> { - if (!ackSet.contains(msg)) + if (!ackSet.contains(msg.getMessageId())) consumer.negativeAcknowledge(msg); }); } else if (!manualAckMode) {