SAC: message delivery halts when a second consumer with a higher priority is introduced #14142
-
Describe the bugWhen using a quorum queue with the following arguments:
A first consumer is created with priority = 0 and QoS = 1. This consumer starts and requeues the delivered messages. Reproduction steps
Expected behaviorAfter Consumer B (with higher priority) is started, message delivery should switch to it, as per the x-single-active-consumer and priority settings. Additional contextRabbitMQ versions tested:
A test to check this issue: import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
@Testcontainers
class SingleActiveConsumerWithPrioTest {
@Container
private static final RabbitMQContainer rabbitMQContainer =
new RabbitMQContainer("rabbitmq:4.1-alpine");
private static final String QUEUE_NAME = "test-queue";
private static ConnectionFactory connectionFactory;
private Connection connection;
@BeforeAll
static void setUpAll() throws Exception {
createConnectionFactory();
prepareQueue();
}
static void createConnectionFactory() {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost(rabbitMQContainer.getHost());
connectionFactory.setPort(rabbitMQContainer.getAmqpPort());
connectionFactory.setUsername(rabbitMQContainer.getAdminUsername());
connectionFactory.setPassword(rabbitMQContainer.getAdminPassword());
}
static void prepareQueue() throws Exception {
try (Connection connection = connectionFactory.newConnection()) {
try (var channel = connection.createChannel()) {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-type", "quorum");
arguments.put("x-single-active-consumer", true);
arguments.put("x-delivery-limit", -1);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
for (int i = 0; i < 100; i++) {
// Publish some test messages
channel.basicPublish("", QUEUE_NAME, null, ("Test message " + i).getBytes());
}
}
}
}
@BeforeEach
void setUp() throws Exception {
connection = connectionFactory.newConnection();
}
@AfterEach
void tearDown() throws Exception {
if (connection != null && connection.isOpen()) {
connection.close();
}
}
@Test
@DisplayName(
"Given a consumer that requeues messages, when another consumer is created with more priority, then consuming gets stuck")
void givenAConsumerThatRequeue_whenOtherConsumerIsCreatedWithMorePriority_thenConsumingGetsStuck()
throws Exception {
// given
Channel channel1 = connection.createChannel();
channel1.basicQos(1); // Set QoS to 1 to ensure single message delivery
Map<String, Object> args1 = new HashMap<>();
args1.put("x-priority", 0); // Lower priority for the first consumer
AtomicInteger counterConsumer1 = new AtomicInteger(0);
channel1.basicConsume(
QUEUE_NAME,
false,
args1,
(consumerTag, message) -> {
System.out
.format(
"Message received for consumer 1 %s: %s",
consumerTag, new String(message.getBody()))
.println();
counterConsumer1.incrementAndGet();
// Simulate processing time
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
channel1.basicNack(
message.getEnvelope().getDeliveryTag(), false, true); // Requeue the message
},
consumerTag -> {});
// Waits to see that the first consumer is consuming
System.out.println("First consumer should consuming...");
await().pollDelay(10, TimeUnit.SECONDS).timeout(11, TimeUnit.SECONDS).until(() -> true);
assertThat(counterConsumer1.get()).isGreaterThan(0); // Ensure the first consumer has processed
// at least one message
// when
Channel channel2 = connection.createChannel();
Map<String, Object> args2 = new HashMap<>();
args2.put("x-priority", 100); // Higher priority for the second consumer
AtomicInteger counterConsumer2 = new AtomicInteger(0);
channel2.basicConsume(
QUEUE_NAME,
false,
args2,
(consumerTag, message) -> {
System.out
.format(
"Message received for consumer 2 %s: %s",
consumerTag, new String(message.getBody()))
.println();
// Simulate processing time
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
counterConsumer2.incrementAndGet();
channel2.basicAck(message.getEnvelope().getDeliveryTag(), false);
},
consumerTag -> {});
int consumption1 = counterConsumer1.get();
// Waits to see if second consumer is consuming
System.out.println("Second consumer should consuming...");
await().pollDelay(10, TimeUnit.SECONDS).timeout(11, TimeUnit.SECONDS).until(() -> true);
// then
// The second consumer does not consume any messages
assertThat(counterConsumer2.get()).isZero();
// The first consumer stopped after second consumer started
assertThat(counterConsumer1.get()).isEqualTo(consumption1);
// cleanup
channel1.close();
channel2.close();
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 6 replies
-
@elhectormento from the Single Active Consumer documentation:
Perhaps you have missed this part specifically:
We cannot suggest anything else without knowing what RabbitMQ version is used and logs from the node those consumes connect to, and if there are multiple nodes, all of them. |
Beta Was this translation helpful? Give feedback.
-
@acogoluegnes also points out that the redelivery loop protection that quorum queues have, can play a role in a test like this. If the current SAC keeps requeueing its deliveries, this will create a condition where there are always pending deliveries to the same active consumer, so the redelivery loop protection will kicks in, and the flow of deliveries will stop. |
Beta Was this translation helpful? Give feedback.
-
@kjnilsson confirmed it is a bug, I'll create a follow-up issue. |
Beta Was this translation helpful? Give feedback.
@kjnilsson confirmed it is a bug, I'll create a follow-up issue.