diff --git a/src/main/java/com/rabbitmq/client/amqp/SynchronousConsumer.java b/src/main/java/com/rabbitmq/client/amqp/SynchronousConsumer.java new file mode 100644 index 000000000..dc5b9743d --- /dev/null +++ b/src/main/java/com/rabbitmq/client/amqp/SynchronousConsumer.java @@ -0,0 +1,42 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.amqp; + +import java.time.Duration; +import java.util.List; + +public interface SynchronousConsumer extends AutoCloseable, Resource { + + Response get(Duration timeout); + + List get(int messageCount, Duration timeout); + + // CompletableFuture getAsync(Duration timeout); + // + // CompletableFuture> getAsync(int messageCount, Duration timeout); + + @Override + void close(); + + interface Response { + + Consumer.Context context(); + + Message message(); + } +} diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java index 82d6edac5..22043cb74 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java @@ -21,7 +21,6 @@ import static com.rabbitmq.client.amqp.Resource.State.CLOSING; import static com.rabbitmq.client.amqp.Resource.State.OPEN; import static com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder.*; -import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.*; import static java.time.Duration.ofSeconds; import static java.util.Optional.ofNullable; @@ -38,7 +37,6 @@ import java.util.stream.IntStream; import org.apache.qpid.protonj2.client.*; import org.apache.qpid.protonj2.client.exceptions.*; -import org.apache.qpid.protonj2.client.impl.ClientConversionSupport; import org.apache.qpid.protonj2.client.impl.ClientReceiver; import org.apache.qpid.protonj2.client.util.DeliveryQueue; import org.apache.qpid.protonj2.engine.EventHandler; @@ -47,14 +45,10 @@ import org.apache.qpid.protonj2.engine.impl.ProtonReceiver; import org.apache.qpid.protonj2.engine.impl.ProtonSessionIncomingWindow; import org.apache.qpid.protonj2.types.DescribedType; -import org.apache.qpid.protonj2.types.messaging.Accepted; -import org.apache.qpid.protonj2.types.messaging.Modified; -import org.apache.qpid.protonj2.types.messaging.Rejected; -import org.apache.qpid.protonj2.types.messaging.Released; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class AmqpConsumer extends ResourceBase implements Consumer { +final class AmqpConsumer extends ResourceBase implements Consumer, ConsumerUtils.CloseableConsumer { private static final AtomicLong ID_SEQUENCE = new AtomicLong(0); @@ -114,7 +108,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer { .dispatch( () -> { // get result to make spotbugs happy - boolean ignored = maybeCloseConsumerOnException(this, e); + boolean ignored = ConsumerUtils.maybeCloseConsumerOnException(this, e); }); }; this.consumerWorkService = connection.consumerWorkService(); @@ -301,7 +295,7 @@ void recoverAfterConnectionFailure() { } } - void close(Throwable cause) { + public void close(Throwable cause) { if (this.closed.compareAndSet(false, true)) { this.state(CLOSING, cause); if (this.consumerWorkService != null) { @@ -362,7 +356,8 @@ private void initStateFromNativeReceiver(ClientReceiver receiver) { throw new AmqpException("Could not initialize consumer internal state"); } } catch (InterruptedException e) { - throw new RuntimeException(e); + Thread.currentThread().interrupt(); + throw new AmqpException(e); } } @@ -400,17 +395,13 @@ enum PauseStatus { PAUSED } - private static class DeliveryContext implements Consumer.Context { + private static class DeliveryContext extends ConsumerUtils.DeliveryContextBase { - private static final DeliveryState REJECTED = DeliveryState.rejected(null, null); - private final AtomicBoolean settled = new AtomicBoolean(false); - private final Delivery delivery; private final Scheduler protonExecutor; private final ProtonReceiver protonReceiver; private final MetricsCollector metricsCollector; private final AtomicLong unsettledMessageCount; private final Runnable replenishCreditOperation; - private final AmqpConsumer consumer; private DeliveryContext( Delivery delivery, @@ -420,48 +411,17 @@ private DeliveryContext( AtomicLong unsettledMessageCount, Runnable replenishCreditOperation, AmqpConsumer consumer) { - this.delivery = delivery; + super(delivery, consumer); this.protonExecutor = protonExecutor; this.protonReceiver = protonReceiver; this.metricsCollector = metricsCollector; this.unsettledMessageCount = unsettledMessageCount; this.replenishCreditOperation = replenishCreditOperation; - this.consumer = consumer; } @Override - public void accept() { - this.settle(DeliveryState.accepted(), ACCEPTED, "accept"); - } - - @Override - public void discard() { - settle(REJECTED, DISCARDED, "discard"); - } - - @Override - public void discard(Map annotations) { - annotations = annotations == null ? Collections.emptyMap() : annotations; - Utils.checkMessageAnnotations(annotations); - this.settle(DeliveryState.modified(true, true, annotations), DISCARDED, "discard (modified)"); - } - - @Override - public void requeue() { - settle(DeliveryState.released(), REQUEUED, "requeue"); - } - - @Override - public void requeue(Map annotations) { - annotations = annotations == null ? Collections.emptyMap() : annotations; - Utils.checkMessageAnnotations(annotations); - this.settle( - DeliveryState.modified(false, false, annotations), REQUEUED, "requeue (modified)"); - } - - @Override - public BatchContext batch(int batchSizeHint) { - return new BatchDeliveryContext( + public Consumer.BatchContext batch(int batchSizeHint) { + return new BatchContext( batchSizeHint, protonExecutor, protonReceiver, @@ -471,18 +431,13 @@ public BatchContext batch(int batchSizeHint) { consumer); } - private void settle( - DeliveryState state, MetricsCollector.ConsumeDisposition disposition, String label) { - if (settled.compareAndSet(false, true)) { - try { - protonExecutor.execute(replenishCreditOperation); - delivery.disposition(state, true); - unsettledMessageCount.decrementAndGet(); - metricsCollector.consumeDisposition(disposition); - } catch (Exception e) { - handleContextException(this.consumer, e, label); - } - } + @Override + protected void doSettle(DeliveryState state, MetricsCollector.ConsumeDisposition disposition) + throws Exception { + protonExecutor.execute(replenishCreditOperation); + delivery.disposition(state, true); + unsettledMessageCount.decrementAndGet(); + metricsCollector.consumeDisposition(disposition); } } @@ -491,142 +446,50 @@ public String toString() { return "AmqpConsumer{" + "id=" + id + ", queue='" + queue + '\'' + '}'; } - private static final class BatchDeliveryContext implements BatchContext { + private static final class BatchContext extends ConsumerUtils.BatchContextBase { - private static final org.apache.qpid.protonj2.types.transport.DeliveryState REJECTED = - new Rejected(); - private final List contexts; - private final AtomicBoolean settled = new AtomicBoolean(false); private final Scheduler protonExecutor; private final ProtonReceiver protonReceiver; private final MetricsCollector metricsCollector; private final AtomicLong unsettledMessageCount; private final Runnable replenishCreditOperation; - private final AmqpConsumer consumer; - private BatchDeliveryContext( + private BatchContext( int batchSizeHint, Scheduler protonExecutor, ProtonReceiver protonReceiver, MetricsCollector metricsCollector, AtomicLong unsettledMessageCount, Runnable replenishCreditOperation, - AmqpConsumer consumer) { - this.contexts = new ArrayList<>(batchSizeHint); + ConsumerUtils.CloseableConsumer consumer) { + super(batchSizeHint, consumer); this.protonExecutor = protonExecutor; this.protonReceiver = protonReceiver; this.metricsCollector = metricsCollector; this.unsettledMessageCount = unsettledMessageCount; this.replenishCreditOperation = replenishCreditOperation; - this.consumer = consumer; - } - - @Override - public void add(Consumer.Context context) { - if (this.settled.get()) { - throw new IllegalStateException("Batch is closed"); - } else { - if (context instanceof DeliveryContext) { - DeliveryContext dctx = (DeliveryContext) context; - // marking the context as settled avoids operation on it and deduplicates as well - if (dctx.settled.compareAndSet(false, true)) { - this.contexts.add(dctx); - } else { - throw new IllegalStateException("Message already settled"); - } - } else { - throw new IllegalArgumentException("Context type not supported: " + context); - } - } - } - - @Override - public int size() { - return this.contexts.size(); } @Override - public void accept() { - this.settle(Accepted.getInstance(), ACCEPTED, "accept"); - } - - @Override - public void discard() { - this.settle(REJECTED, DISCARDED, "discard"); - } - - @Override - public void discard(Map annotations) { - annotations = annotations == null ? Collections.emptyMap() : annotations; - Utils.checkMessageAnnotations(annotations); - Modified state = - new Modified(false, true, ClientConversionSupport.toSymbolKeyedMap(annotations)); - this.settle(state, DISCARDED, "discard (modified)"); - } - - @Override - public void requeue() { - this.settle(Released.getInstance(), REQUEUED, "requeue"); - } - - @Override - public void requeue(Map annotations) { - annotations = annotations == null ? Collections.emptyMap() : annotations; - Utils.checkMessageAnnotations(annotations); - Modified state = - new Modified(false, false, ClientConversionSupport.toSymbolKeyedMap(annotations)); - this.settle(state, REQUEUED, "requeue (modified)"); - } - - @Override - public BatchContext batch(int batchSizeHint) { - return this; - } - - private void settle( + protected void doSettle( org.apache.qpid.protonj2.types.transport.DeliveryState state, - MetricsCollector.ConsumeDisposition disposition, - String label) { - if (settled.compareAndSet(false, true)) { - int batchSize = this.contexts.size(); - try { - protonExecutor.execute(replenishCreditOperation); - long[][] ranges = - SerialNumberUtils.ranges(this.contexts, ctx -> ctx.delivery.getDeliveryId()); - this.protonExecutor.execute( - () -> { - for (long[] range : ranges) { - this.protonReceiver.disposition(state, range); - } + MetricsCollector.ConsumeDisposition disposition) { + int batchSize = this.size(); + protonExecutor.execute(replenishCreditOperation); + long[][] ranges = + SerialNumberUtils.ranges(this.contexts(), ctx -> ctx.delivery.getDeliveryId()); + this.protonExecutor.execute( + () -> { + for (long[] range : ranges) { + this.protonReceiver.disposition(state, range); + } + }); + unsettledMessageCount.addAndGet(-batchSize); + IntStream.range(0, batchSize) + .forEach( + ignored -> { + metricsCollector.consumeDisposition(disposition); }); - unsettledMessageCount.addAndGet(-batchSize); - IntStream.range(0, batchSize) - .forEach( - ignored -> { - metricsCollector.consumeDisposition(disposition); - }); - } catch (Exception e) { - handleContextException(this.consumer, e, label); - } - } - } - } - - private static void handleContextException( - AmqpConsumer consumer, Exception ex, String operation) { - if (maybeCloseConsumerOnException(consumer, ex)) { - return; } - if (ex instanceof ClientIllegalStateException - || ex instanceof RejectedExecutionException - || ex instanceof ClientIOException) { - LOGGER.debug("message {} failed: {}", operation, ex.getMessage()); - } else if (ex instanceof ClientException) { - throw ExceptionUtils.convert((ClientException) ex); - } - } - - private static boolean maybeCloseConsumerOnException(AmqpConsumer consumer, Exception ex) { - return ExceptionUtils.maybeCloseOnException(consumer::close, ex); } } diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpSynchronousConsumer.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpSynchronousConsumer.java new file mode 100644 index 000000000..c6454f0e6 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpSynchronousConsumer.java @@ -0,0 +1,273 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.amqp.impl; + +import static com.rabbitmq.client.amqp.Resource.State.CLOSED; +import static com.rabbitmq.client.amqp.Resource.State.CLOSING; +import static com.rabbitmq.client.amqp.Resource.State.OPEN; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import com.rabbitmq.client.amqp.AmqpException; +import com.rabbitmq.client.amqp.Consumer; +import com.rabbitmq.client.amqp.SynchronousConsumer; +import com.rabbitmq.client.amqp.metrics.MetricsCollector; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.qpid.protonj2.client.Delivery; +import org.apache.qpid.protonj2.client.DeliveryMode; +import org.apache.qpid.protonj2.client.DeliveryState; +import org.apache.qpid.protonj2.client.ReceiverOptions; +import org.apache.qpid.protonj2.client.Session; +import org.apache.qpid.protonj2.client.exceptions.ClientException; +import org.apache.qpid.protonj2.client.impl.ClientReceiver; +import org.apache.qpid.protonj2.engine.Scheduler; +import org.apache.qpid.protonj2.engine.impl.ProtonReceiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class AmqpSynchronousConsumer extends ResourceBase + implements SynchronousConsumer, ConsumerUtils.CloseableConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(AmqpSynchronousConsumer.class); + + private final String queue; + private final String address; + private final AmqpConnection connection; + private final SessionHandler sessionHandler; + private final ClientReceiver nativeReceiver; + private final AtomicBoolean closed = new AtomicBoolean(false); + // native receiver internal state, accessed only in the native executor/scheduler + private ProtonReceiver protonReceiver; + private volatile Scheduler protonExecutor; + + AmqpSynchronousConsumer(String queue, AmqpConnection connection, List listeners) { + super(listeners); + + DefaultAddressBuilder addressBuilder = Utils.addressBuilder(); + addressBuilder.queue(queue); + this.address = addressBuilder.address(); + this.queue = queue; + this.connection = connection; + this.sessionHandler = connection.createSessionHandler(); + this.nativeReceiver = this.createNativeReceiver(this.sessionHandler.session(), this.address); + this.initStateFromNativeReceiver(this.nativeReceiver); + this.state(OPEN); + } + + @Override + public Response get(Duration timeout) { + List responses = this.get(1, timeout); + return responses.isEmpty() ? null : responses.get(0); + } + + @Override + public List get(int messageCount, Duration timeout) { + checkOpen(); + try { + List messages = new ArrayList<>(messageCount); + nativeReceiver.addCredit(messageCount); + Delivery delivery = null; + while ((delivery = nativeReceiver.receive(timeout.toMillis(), MILLISECONDS)) != null) { + this.includeMessage(messages, delivery); + } + nativeReceiver.drain().get(timeout.toMillis(), MILLISECONDS); + while ((delivery = nativeReceiver.tryReceive()) != null) { + this.includeMessage(messages, delivery); + } + return messages; + } catch (ClientException e) { + throw ExceptionUtils.convert(e); + } catch (ExecutionException e) { + throw ExceptionUtils.convert(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AmqpException(e); + } catch (TimeoutException e) { + throw new AmqpException(e); + } + } + + @Override + public void close() { + this.close(null); + } + + // internal API + + private ClientReceiver createNativeReceiver(Session nativeSession, String address) { + ReceiverOptions receiverOptions = + new ReceiverOptions() + .deliveryMode(DeliveryMode.AT_LEAST_ONCE) + .autoAccept(false) + .autoSettle(false) + .creditWindow(0); + + try { + ClientReceiver receiver = + (ClientReceiver) + ExceptionUtils.wrapGet( + nativeSession.openReceiver(address, receiverOptions).openFuture()); + return receiver; + } catch (ClientException e) { + throw ExceptionUtils.convert(e, "Error while creating receiver from '%s'", address); + } + } + + private void initStateFromNativeReceiver(ClientReceiver receiver) { + try { + Scheduler protonExecutor = receiver.executor(); + CountDownLatch fieldsSetLatch = new CountDownLatch(1); + protonExecutor.execute( + () -> { + this.protonReceiver = (ProtonReceiver) receiver.protonReceiver(); + this.protonExecutor = protonExecutor; + fieldsSetLatch.countDown(); + }); + if (!fieldsSetLatch.await(10, TimeUnit.SECONDS)) { + throw new AmqpException("Could not initialize consumer internal state"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AmqpException(e); + } + } + + private void includeMessage(List responses, Delivery delivery) { + AmqpMessage message = amqpMessage(delivery); + if (message != null) { + DeliveryContext deliveryContext = + new DeliveryContext(delivery, this.protonExecutor, this.protonReceiver, this); + responses.add(new DefaultResponse(message, deliveryContext)); + } + } + + public void close(Throwable cause) { + if (this.closed.compareAndSet(false, true)) { + this.state(CLOSING, cause); + try { + if (this.nativeReceiver != null) { + this.nativeReceiver.close(); + } + this.sessionHandler.close(); + } catch (Exception e) { + LOGGER.warn("Error while closing receiver", e); + } + this.state(CLOSED, cause); + } + } + + private static AmqpMessage amqpMessage(Delivery delivery) { + try { + AmqpMessage message = new AmqpMessage(delivery.message()); + return message; + } catch (ClientException e) { + LOGGER.warn("Error while decoding message: {}", e.getMessage()); + try { + delivery.disposition(DeliveryState.rejected("", ""), true); + } catch (ClientException ex) { + LOGGER.warn("Error while rejecting non-decoded message: {}", ex.getMessage()); + } + return null; + } + } + + private static class DeliveryContext extends ConsumerUtils.DeliveryContextBase { + + private final Scheduler protonExecutor; + private final ProtonReceiver protonReceiver; + + private DeliveryContext( + Delivery delivery, + Scheduler protonExecutor, + ProtonReceiver protonReceiver, + AmqpSynchronousConsumer consumer) { + super(delivery, consumer); + this.protonExecutor = protonExecutor; + this.protonReceiver = protonReceiver; + } + + @Override + public Consumer.BatchContext batch(int batchSizeHint) { + return new BatchContext( + batchSizeHint, this.protonExecutor, this.protonReceiver, this.consumer); + } + + @Override + protected void doSettle(DeliveryState state, MetricsCollector.ConsumeDisposition disposition) + throws Exception { + delivery.disposition(state, true); + } + } + + private static class BatchContext extends ConsumerUtils.BatchContextBase { + + private final Scheduler protonExecutor; + private final ProtonReceiver protonReceiver; + + private BatchContext( + int batchSizeHint, + Scheduler protonExecutor, + ProtonReceiver protonReceiver, + ConsumerUtils.CloseableConsumer consumer) { + super(batchSizeHint, consumer); + this.protonExecutor = protonExecutor; + this.protonReceiver = protonReceiver; + } + + @Override + protected void doSettle( + org.apache.qpid.protonj2.types.transport.DeliveryState state, + MetricsCollector.ConsumeDisposition disposition) { + long[][] ranges = + SerialNumberUtils.ranges(this.contexts(), ctx -> ctx.delivery.getDeliveryId()); + this.protonExecutor.execute( + () -> { + for (long[] range : ranges) { + this.protonReceiver.disposition(state, range); + } + }); + } + } + + private static class DefaultResponse implements Response { + + private final AmqpMessage message; + private final Consumer.Context context; + + private DefaultResponse(AmqpMessage message, Consumer.Context context) { + this.message = message; + this.context = context; + } + + @Override + public AmqpMessage message() { + return message; + } + + @Override + public Consumer.Context context() { + return context; + } + } +} diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/ConsumerUtils.java b/src/main/java/com/rabbitmq/client/amqp/impl/ConsumerUtils.java new file mode 100644 index 000000000..c63f59599 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/amqp/impl/ConsumerUtils.java @@ -0,0 +1,241 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.amqp.impl; + +import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.ACCEPTED; +import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.DISCARDED; +import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.REQUEUED; + +import com.rabbitmq.client.amqp.Consumer; +import com.rabbitmq.client.amqp.metrics.MetricsCollector; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.qpid.protonj2.client.Delivery; +import org.apache.qpid.protonj2.client.DeliveryState; +import org.apache.qpid.protonj2.client.exceptions.ClientException; +import org.apache.qpid.protonj2.client.exceptions.ClientIOException; +import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException; +import org.apache.qpid.protonj2.client.impl.ClientConversionSupport; +import org.apache.qpid.protonj2.types.messaging.Accepted; +import org.apache.qpid.protonj2.types.messaging.Modified; +import org.apache.qpid.protonj2.types.messaging.Rejected; +import org.apache.qpid.protonj2.types.messaging.Released; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ConsumerUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerUtils.class); + + private ConsumerUtils() {} + + static void handleContextException(CloseableConsumer consumer, Exception ex, String operation) { + if (maybeCloseConsumerOnException(consumer, ex)) { + return; + } + if (ex instanceof ClientIllegalStateException + || ex instanceof RejectedExecutionException + || ex instanceof ClientIOException) { + LOGGER.debug("message {} failed: {}", operation, ex.getMessage()); + } else if (ex instanceof ClientException) { + throw ExceptionUtils.convert((ClientException) ex); + } + } + + static boolean maybeCloseConsumerOnException(CloseableConsumer consumer, Exception ex) { + return ExceptionUtils.maybeCloseOnException(consumer::close, ex); + } + + abstract static class ContextBase implements Consumer.Context { + + static final DeliveryState REJECTED = DeliveryState.rejected(null, null); + private final AtomicBoolean settled = new AtomicBoolean(false); + protected final CloseableConsumer consumer; + + protected ContextBase(CloseableConsumer consumer) { + this.consumer = consumer; + } + + protected boolean settleState() { + return this.settled.compareAndSet(false, true); + } + + protected boolean isSettled() { + return this.settled.get(); + } + } + + abstract static class DeliveryContextBase extends ContextBase { + + protected final Delivery delivery; + + protected DeliveryContextBase(Delivery delivery, CloseableConsumer consumer) { + super(consumer); + this.delivery = delivery; + } + + @Override + public void accept() { + this.settle(DeliveryState.accepted(), ACCEPTED, "accept"); + } + + @Override + public void discard() { + this.settle(REJECTED, DISCARDED, "discard"); + } + + @Override + public void discard(Map annotations) { + annotations = annotations == null ? Collections.emptyMap() : annotations; + Utils.checkMessageAnnotations(annotations); + this.settle(DeliveryState.modified(true, true, annotations), DISCARDED, "discard (modified)"); + } + + @Override + public void requeue() { + this.settle(DeliveryState.released(), REQUEUED, "requeue"); + } + + @Override + public void requeue(Map annotations) { + annotations = annotations == null ? Collections.emptyMap() : annotations; + Utils.checkMessageAnnotations(annotations); + this.settle( + DeliveryState.modified(false, false, annotations), REQUEUED, "requeue (modified)"); + } + + protected abstract void doSettle( + DeliveryState state, MetricsCollector.ConsumeDisposition disposition) throws Exception; + + private void settle( + DeliveryState state, MetricsCollector.ConsumeDisposition disposition, String label) { + if (settleState()) { + try { + doSettle(state, disposition); + } catch (Exception e) { + ConsumerUtils.handleContextException(this.consumer, e, label); + } + } + } + } + + abstract static class BatchContextBase extends ContextBase implements Consumer.BatchContext { + + private static final org.apache.qpid.protonj2.types.transport.DeliveryState REJECTED = + new Rejected(); + private final List contexts; + + protected BatchContextBase(int batchSizeHint, ConsumerUtils.CloseableConsumer consumer) { + super(consumer); + this.contexts = new ArrayList<>(batchSizeHint); + } + + @Override + public void add(Consumer.Context context) { + if (isSettled()) { + throw new IllegalStateException("Batch is closed"); + } else { + if (context instanceof DeliveryContextBase) { + DeliveryContextBase dctx = (DeliveryContextBase) context; + // marking the context as settled avoids operation on it and deduplicates as well + if (dctx.settleState()) { + this.contexts.add(dctx); + } else { + throw new IllegalStateException("Message already settled"); + } + } else { + throw new IllegalArgumentException("Context type not supported: " + context); + } + } + } + + @Override + public int size() { + return this.contexts.size(); + } + + @Override + public Consumer.BatchContext batch(int batchSizeHint) { + return this; + } + + @Override + public void accept() { + this.settle(Accepted.getInstance(), ACCEPTED, "accept"); + } + + @Override + public void discard() { + this.settle(REJECTED, DISCARDED, "discard"); + } + + @Override + public void discard(Map annotations) { + annotations = annotations == null ? Collections.emptyMap() : annotations; + Utils.checkMessageAnnotations(annotations); + Modified state = + new Modified(false, true, ClientConversionSupport.toSymbolKeyedMap(annotations)); + this.settle(state, DISCARDED, "discard (modified)"); + } + + @Override + public void requeue() { + this.settle(Released.getInstance(), REQUEUED, "requeue"); + } + + @Override + public void requeue(Map annotations) { + annotations = annotations == null ? Collections.emptyMap() : annotations; + Utils.checkMessageAnnotations(annotations); + Modified state = + new Modified(false, false, ClientConversionSupport.toSymbolKeyedMap(annotations)); + this.settle(state, REQUEUED, "requeue (modified)"); + } + + protected List contexts() { + return this.contexts; + } + + protected abstract void doSettle( + org.apache.qpid.protonj2.types.transport.DeliveryState state, + MetricsCollector.ConsumeDisposition disposition) + throws Exception; + + private void settle( + org.apache.qpid.protonj2.types.transport.DeliveryState state, + MetricsCollector.ConsumeDisposition disposition, + String label) { + if (settleState()) { + try { + doSettle(state, disposition); + } catch (Exception e) { + ConsumerUtils.handleContextException(this.consumer, e, label); + } + } + } + } + + interface CloseableConsumer { + + void close(Throwable e); + } +} diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpAsynchronousConsumerTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpAsynchronousConsumerTest.java new file mode 100644 index 000000000..4d9eb02d9 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpAsynchronousConsumerTest.java @@ -0,0 +1,192 @@ +// Copyright (c) 2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.amqp.impl; + +import static com.rabbitmq.client.amqp.impl.Assertions.assertThat; +import static com.rabbitmq.client.amqp.impl.TestUtils.name; +import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.client.amqp.Consumer; +import com.rabbitmq.client.amqp.Publisher; +import com.rabbitmq.client.amqp.SynchronousConsumer; +import java.time.Duration; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.stream.IntStream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +@AmqpTestInfrastructure +public class AmqpAsynchronousConsumerTest { + + Duration timeout = Duration.ofMillis(100); + AmqpConnection connection; + Publisher publisher; + SynchronousConsumer consumer; + String q; + String connectionName; + int messageCount = 10; + List responses; + + @BeforeEach + void init(TestInfo info) { + this.q = name(info); + this.connectionName = connection.name(); + this.connection.management().queue(this.q).exclusive(true).declare(); + this.publisher = connection.publisherBuilder().queue(this.q).build(); + this.consumer = new AmqpSynchronousConsumer(this.q, this.connection, List.of()); + } + + @AfterEach + void tearDown() { + this.consumer.close(); + this.publisher.close(); + } + + @Test + void requeuedMessageShouldComeBackOnNextGet() { + assertThat(consumer.get(timeout)).isNull(); + byte[] body = UUID.randomUUID().toString().getBytes(UTF_8); + publisher.publish(publisher.message(body), ctx -> {}); + waitAtMost(() -> queueMessageCount() == 1); + + // get and requeue + SynchronousConsumer.Response response = consumer.get(timeout); + assertThat(response).isNotNull(); + assertThat(response.message()).hasBody(body); + waitAtMost(() -> queueMessageCount() == 0); + response.context().requeue(); + waitAtMost(() -> queueMessageCount() == 1); + + // get again and accept + response = consumer.get(timeout); + assertThat(response).isNotNull(); + assertThat(response.message()).hasBody(body); + response.context().accept(); + waitAtMost(() -> queueMessageCount() == 0); + } + + @Test + void multipleGetShouldReturnRequestedNumberOfMessages() { + publish(messageCount); + responses = consumer.get(messageCount, timeout); + assertThat(responses).hasSize(messageCount); + accept(responses); + waitAtMost(() -> queueMessageCount() == 0); + } + + @Test + void multipleGetShouldReturnAllQueueMessagesIfRequestedGreaterThanCount() { + publish(messageCount); + responses = consumer.get(messageCount + 2, timeout); + assertThat(responses).hasSize(messageCount); + accept(responses); + waitAtMost(() -> queueMessageCount() == 0); + } + + @Test + void multipleGetShouldNotEmptyQueue() { + publish(messageCount * 2); + responses = consumer.get(messageCount, timeout); + assertThat(responses).hasSize(messageCount); + accept(responses); + waitAtMost(() -> queueMessageCount() == messageCount); + + responses = consumer.get(messageCount, timeout); + assertThat(responses).hasSize(messageCount); + accept(responses); + waitAtMost(() -> queueMessageCount() == 0); + } + + @Test + void multipleGetShouldReturnEmptyListIfNoMessages() { + responses = consumer.get(messageCount, timeout); + assertThat(responses).isEmpty(); + } + + @Test + void multipleGetAndBatchAccept() { + publish(messageCount * 2); + responses = consumer.get(messageCount, timeout); + assertThat(responses).hasSize(messageCount); + Consumer.BatchContext batch = null; + for (SynchronousConsumer.Response r : responses) { + if (batch == null) { + batch = r.context().batch(messageCount); + } + batch.add(r.context()); + } + batch.accept(); + waitAtMost(() -> queueMessageCount() == messageCount); + + responses = consumer.get(messageCount, timeout); + assertThat(responses).hasSize(messageCount); + batch = null; + for (SynchronousConsumer.Response r : responses) { + if (batch == null) { + batch = r.context().batch(messageCount); + } + batch.add(r.context()); + } + batch.accept(); + waitAtMost(() -> queueMessageCount() == 0); + } + + @Test + void requeuedMessageNotInBatchShouldGoBackToQueue() { + publish(messageCount); + responses = consumer.get(messageCount, timeout); + assertThat(responses).hasSize(messageCount); + Consumer.BatchContext batch = null; + int indexToRequeue = new Random().nextInt(messageCount); + for (int i = 0; i < messageCount; i++) { + if (i == indexToRequeue) { + responses.get(i).context().requeue(); + } else { + if (batch == null) { + batch = responses.get(i).context().batch(messageCount - 1); + } + batch.add(responses.get(i).context()); + } + } + batch.accept(); + waitAtMost(() -> queueMessageCount() == 1); + + responses = consumer.get(messageCount, timeout); + assertThat(responses).hasSize(1); + responses.get(0).context().accept(); + waitAtMost(() -> queueMessageCount() == 0); + } + + private void publish(int count) { + IntStream.range(0, count).forEach(ignored -> publisher.publish(publisher.message(), ctx -> {})); + } + + private void accept(List responses) { + responses.forEach(response -> response.context().accept()); + } + + private long queueMessageCount() { + return connection.management().queueInfo(q).messageCount(); + } +} diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/ClientTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/ClientTest.java index 0cce5c120..553252ead 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/ClientTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/ClientTest.java @@ -21,6 +21,7 @@ import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0; import static com.rabbitmq.client.amqp.impl.TestUtils.*; import static java.nio.charset.StandardCharsets.*; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.qpid.protonj2.client.DeliveryMode.AT_LEAST_ONCE; import static org.apache.qpid.protonj2.client.DeliveryState.released; @@ -34,17 +35,25 @@ import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.io.OutputStream; +import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.IntConsumer; import java.util.stream.IntStream; import org.apache.qpid.protonj2.client.*; import org.apache.qpid.protonj2.client.Message; import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException; +import org.apache.qpid.protonj2.client.exceptions.ClientException; import org.apache.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException; +import org.apache.qpid.protonj2.client.impl.ClientReceiver; import org.apache.qpid.protonj2.types.UnsignedLong; import org.junit.jupiter.api.*; @@ -423,4 +432,100 @@ void dynamicReceiver() throws Exception { assertThat(delivery.message().body()).isEqualTo(body); } } + + @Test + @BrokerVersionAtLeast(RABBITMQ_4_1_0) + void asynchronousGet() throws Exception { + try (Client client = client()) { + org.apache.qpid.protonj2.client.Connection c1 = connection(client, o -> o.traceFrames(true)); + Session s1 = c1.openSession(); + ReceiverOptions receiverOptions = + new ReceiverOptions().deliveryMode(AT_LEAST_ONCE).autoAccept(false).creditWindow(0); + receiverOptions.sourceOptions().capabilities("temporary-queue"); + ClientReceiver receiver = (ClientReceiver) s1.openDynamicReceiver(receiverOptions); + receiver.openFuture().get(); + assertThat(receiver.address()).isNotNull(); + + org.apache.qpid.protonj2.client.Connection c2 = connection(client); + Session s2 = c2.openSession(); + Sender sender = s2.openSender(receiver.address()); + String body = UUID.randomUUID().toString(); + sender.send(Message.create(body)); + + IntConsumer send = + count -> { + for (int i = 0; i < count; i++) { + try { + sender.send(Message.create("test")); + } catch (ClientException e) { + throw new RuntimeException(e); + } + } + }; + + Consumer accept = + delivery -> { + try { + delivery.disposition(DeliveryState.accepted(), true); + } catch (ClientException e) { + throw new RuntimeException(e); + } + }; + + Duration timeout = Duration.ofMillis(100L); + Function> receive = + messageCount -> { + try { + List messages = new ArrayList<>(messageCount); + receiver.addCredit(messageCount); + Delivery delivery = null; + while ((delivery = receiver.receive(timeout.toMillis(), MILLISECONDS)) != null) { + messages.add(delivery); + } + receiver.drain().get(timeout.toMillis(), MILLISECONDS); + delivery = receiver.tryReceive(); + if (delivery != null) { + messages.add(delivery); + } + return messages; + } catch (ClientException + | InterruptedException + | ExecutionException + | TimeoutException e) { + throw new RuntimeException(e); + } + }; + + List deliveries = receive.apply(1); + assertThat(deliveries).hasSize(1); + assertThat(deliveries.get(0).message().body()).isEqualTo(body); + accept.accept(deliveries.get(0)); + + deliveries = receive.apply(1); + assertThat(deliveries).isEmpty(); + + int messageCount = 10; + send.accept(messageCount); + deliveries = receive.apply(messageCount); + assertThat(deliveries).hasSize(messageCount); + deliveries.forEach(accept); + + send.accept(messageCount); + deliveries = receive.apply(messageCount + 2); + assertThat(deliveries).hasSize(messageCount); + deliveries.forEach(accept); + + send.accept(messageCount * 2); + deliveries = receive.apply(messageCount); + assertThat(deliveries).hasSize(messageCount); + deliveries.forEach(accept); + + deliveries = receive.apply(messageCount); + assertThat(deliveries).hasSize(messageCount); + deliveries.forEach(accept); + + deliveries = receive.apply(messageCount); + assertThat(deliveries).isEmpty(); + } + } }