Skip to content

Commit 3f33f0f

Browse files
committed
Fix spring-rabbitmq-client for latest API
* Fix deprecation in the `RabbitConnectionFactoryBean`
1 parent 2d3df44 commit 3f33f0f

File tree

3 files changed

+49
-13
lines changed

3 files changed

+49
-13
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitConnectionFactoryBean.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
import org.springframework.util.StringUtils;
6060

6161
/**
62-
* Factory bean to create a RabbitMQ ConnectionFactory, delegating most setter methods and
62+
* The factory bean to create a RabbitMQ ConnectionFactory, delegating most setter methods and
6363
* optionally enabling SSL, with or without certificate validation. When
6464
* {@link #setSslPropertiesLocation(Resource) sslPropertiesLocation} is not null, the
6565
* default implementation loads a {@code PKCS12} keystore and a {@code JKS} truststore
@@ -80,6 +80,7 @@
8080
* @author Dominique Villard
8181
* @author Zachary DeLuca
8282
* @author Ngoc Nhan
83+
* @author Artem Bilan
8384
*
8485
* @since 1.4
8586
*/
@@ -167,16 +168,16 @@ public RabbitConnectionFactoryBean() {
167168
}
168169

169170
/**
170-
* Whether Server Side certificate has to be validated or not.
171-
* @return true if Server Side certificate has to be skipped
171+
* Whether a Server Side certificate has to be validated or not.
172+
* @return true if the Server Side certificate has to be skipped
172173
* @since 1.6.6
173174
*/
174175
public boolean isSkipServerCertificateValidation() {
175176
return this.skipServerCertificateValidation;
176177
}
177178

178179
/**
179-
* Whether Server Side certificate has to be validated or not.
180+
* Whether a Server Side certificate has to be validated or not.
180181
* This would be used if useSSL is set to true and should only be used on dev or Qa regions
181182
* skipServerCertificateValidation should <b> never be set to true in production</b>
182183
* @param skipServerCertificateValidation Flag to override Server side certificate checks;
@@ -598,8 +599,10 @@ public void setExceptionHandler(ExceptionHandler exceptionHandler) {
598599
/**
599600
* Whether the factory should be configured to use Java NIO.
600601
* @param useNio true to use Java NIO, false to use blocking IO
602+
* @deprecated since 4.0 in favor of {@link #setUseNetty(boolean)}
601603
* @see com.rabbitmq.client.ConnectionFactory#useNio()
602604
*/
605+
@Deprecated(since = "4.0", forRemoval = true)
603606
public void setUseNio(boolean useNio) {
604607
if (useNio) {
605608
this.connectionFactory.useNio();
@@ -609,10 +612,26 @@ public void setUseNio(boolean useNio) {
609612
}
610613
}
611614

615+
/**
616+
* Whether the factory should be configured to use Netty.
617+
* @param useNetty true to use Netty, false to use blocking IO
618+
* @see com.rabbitmq.client.ConnectionFactory#netty()
619+
*/
620+
public void setUseNetty(boolean useNetty) {
621+
if (useNetty) {
622+
this.connectionFactory.netty();
623+
}
624+
else {
625+
this.connectionFactory.useBlockingIo();
626+
}
627+
}
628+
612629
/**
613630
* @param nioParams the NIO parameters
631+
* @deprecated since 4.0 in favor of {@link #setUseNetty(boolean)}
614632
* @see com.rabbitmq.client.ConnectionFactory#setNioParams(com.rabbitmq.client.impl.nio.NioParams)
615633
*/
634+
@Deprecated(since = "4.0", forRemoval = true)
616635
public void setNioParams(NioParams nioParams) {
617636
this.connectionFactory.setNioParams(nioParams);
618637
}
@@ -627,7 +646,7 @@ public void setMetricsCollector(MetricsCollector metricsCollector) {
627646

628647
/**
629648
* Set to true to enable amqp-client automatic recovery. Note: Spring AMQP
630-
* implements its own connection recovery and this is generally not needed.
649+
* implements its own connection recovery, and this is generally not needed.
631650
* @param automaticRecoveryEnabled true to enable.
632651
* @since 1.7.1
633652
*/
@@ -638,7 +657,7 @@ public void setAutomaticRecoveryEnabled(boolean automaticRecoveryEnabled) {
638657
/**
639658
* Set to true to enable amqp-client topology recovery. Note: if there is a
640659
* Rabbit admin in the application context, Spring AMQP
641-
* implements its own recovery and this is generally not needed.
660+
* implements its own recovery, and this is generally not needed.
642661
* @param topologyRecoveryEnabled true to enable.
643662
* @since 1.7.1
644663
*/

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpTemplate.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import com.rabbitmq.client.amqp.Consumer;
2929
import com.rabbitmq.client.amqp.Environment;
3030
import com.rabbitmq.client.amqp.Publisher;
31+
import com.rabbitmq.client.amqp.Requester;
3132
import com.rabbitmq.client.amqp.Resource;
32-
import com.rabbitmq.client.amqp.RpcClient;
3333
import org.jspecify.annotations.Nullable;
3434

3535
import org.springframework.amqp.AmqpIllegalStateException;
@@ -171,7 +171,11 @@ private String getRequiredQueue() throws IllegalStateException {
171171
return name;
172172
}
173173

174-
private Publisher getPublisher() {
174+
/**
175+
* Return the {@link Publisher} for low-level AMQP operations.
176+
* @return the {@link Publisher} for low-level AMQP operations.
177+
*/
178+
public Publisher getPublisher() {
175179
Object publisherToReturn = this.publisher;
176180
if (publisherToReturn == null) {
177181
this.instanceLock.lock();
@@ -524,7 +528,7 @@ private CompletableFuture<Message> doSendAndReceive(@Nullable String exchange, @
524528
// To associate a response with a request, the correlation-id value of the response properties
525529
// MUST be set to the message-id value of the request properties.
526530
// So, this supplier will override request message-id, respectively.
527-
// Otherwise, the RpcClient generates correlation-id internally.
531+
// Otherwise, the Requester generates correlation-id internally.
528532
Supplier<Object> correlationIdSupplier = null;
529533
if (StringUtils.hasText(correlationId)) {
530534
correlationIdSupplier = () -> correlationId;
@@ -534,15 +538,15 @@ else if (StringUtils.hasText(messageId)) {
534538
}
535539

536540
// The default reply-to queue, or the one supplied in the message.
537-
// Otherwise, the RpcClient generates one as exclusive and auto-delete.
541+
// Otherwise, the Requester generates one as exclusive and auto-delete.
538542
String replyToQueue = this.defaultReplyToQueue;
539543
if (StringUtils.hasText(replyTo)) {
540544
replyToQueue = replyTo;
541545
}
542546

543-
RpcClient rpcClient =
547+
Requester rpcClient =
544548
this.connectionFactory.getConnection()
545-
.rpcClientBuilder()
549+
.requesterBuilder()
546550
.requestTimeout(this.publishTimeout)
547551
.correlationIdSupplier(correlationIdSupplier)
548552
.replyToQueue(replyToQueue)

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/listener/RabbitAmqpMessageListenerAdapter.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424

2525
import com.rabbitmq.client.Channel;
2626
import com.rabbitmq.client.amqp.Consumer;
27+
import com.rabbitmq.client.amqp.Publisher;
2728
import org.jspecify.annotations.Nullable;
2829

2930
import org.springframework.amqp.core.Address;
3031
import org.springframework.amqp.core.AmqpAcknowledgment;
3132
import org.springframework.amqp.core.Message;
3233
import org.springframework.amqp.core.MessagePostProcessor;
34+
import org.springframework.amqp.rabbit.core.AmqpNackReceivedException;
3335
import org.springframework.amqp.rabbit.listener.adapter.InvocationResult;
3436
import org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter;
3537
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
@@ -156,7 +158,18 @@ protected void sendResponse(@Nullable Channel channel, Address replyTo, Message
156158
}
157159
else {
158160
Assert.hasText(replyToRoutingKey, "The 'replyTo' must be provided, in request message or in @SendTo.");
159-
sendFuture = this.rabbitAmqpTemplate.send(replyToRoutingKey.replaceFirst("queues/", ""), replyMessage);
161+
Publisher publisher = this.rabbitAmqpTemplate.getPublisher();
162+
com.rabbitmq.client.amqp.Message amqpMessage = publisher.message();
163+
RabbitAmqpUtils.toAmqpMessage(replyMessage, amqpMessage);
164+
amqpMessage.to("/queues/" + replyToRoutingKey.replaceFirst("queues/", ""));
165+
sendFuture = new CompletableFuture<>();
166+
publisher.publish(amqpMessage, (context) -> {
167+
switch (context.status()) {
168+
case ACCEPTED -> sendFuture.complete(true);
169+
case REJECTED, RELEASED -> sendFuture.completeExceptionally(
170+
new AmqpNackReceivedException("The message was rejected", messageIn));
171+
}
172+
});
160173
}
161174

162175
sendFuture.join();

0 commit comments

Comments
 (0)