diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java index 21fcf5753a..23f6bebffe 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java @@ -26,6 +26,7 @@ import jakarta.jms.JMSException; import jakarta.jms.MessageProducer; import jakarta.jms.Session; +import org.jspecify.annotations.Nullable; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; @@ -56,7 +57,6 @@ import org.springframework.jms.support.converter.SimpleMessageConverter; import org.springframework.jms.support.destination.DestinationResolver; import org.springframework.jms.support.destination.DynamicDestinationResolver; -import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessagingException; @@ -96,9 +96,9 @@ public class ChannelPublishingJmsMessageListener private boolean extractReplyPayload = true; - private Object defaultReplyDestination; + private @Nullable Object defaultReplyDestination; - private String correlationKey; + private @Nullable String correlationKey; private long replyTimeToLive = jakarta.jms.Message.DEFAULT_TIME_TO_LIVE; @@ -110,14 +110,16 @@ public class ChannelPublishingJmsMessageListener private DestinationResolver destinationResolver = new DynamicDestinationResolver(); - private Expression replyToExpression; + private @Nullable Expression replyToExpression; private JmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper(); + @SuppressWarnings("NullAway.Init") private BeanFactory beanFactory; private MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory(); + @SuppressWarnings("NullAway.Init") private StandardEvaluationContext evaluationContext; /** @@ -174,7 +176,7 @@ public void setShouldTrack(boolean shouldTrack) { } @Override - public String getComponentName() { + public @Nullable String getComponentName() { return this.gatewayDelegate.getComponentName(); } @@ -563,7 +565,7 @@ else if (replyToValue instanceof String destinationName) { * @see #setDefaultReplyTopicName * @see #setDestinationResolver */ - private Destination resolveDefaultReplyDestination(Session session) throws JMSException { + private @Nullable Destination resolveDefaultReplyDestination(Session session) throws JMSException { if (this.defaultReplyDestination instanceof Destination destination) { return destination; } @@ -600,7 +602,7 @@ private record DestinationNameHolder(String name, boolean isTopic) { private class GatewayDelegate extends MessagingGatewaySupport { - private static final ThreadLocal ATTRIBUTES_HOLDER = new ThreadLocal<>(); + private static final ThreadLocal<@Nullable AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>(); @Nullable private RetryOperations retryTemplate; @@ -621,7 +623,9 @@ private void send(jakarta.jms.Message jmsMessage, Message requestMessage) { else { this.retryTemplate.execute( context -> { - StaticMessageHeaderAccessor.getDeliveryAttempt(requestMessage).incrementAndGet(); + var deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt(requestMessage); + Assert.notNull(deliveryAttempt, "deliveryAttempt must not be null"); + deliveryAttempt.incrementAndGet(); setAttributesIfNecessary(jmsMessage, requestMessage); send(requestMessage); return null; @@ -635,7 +639,7 @@ private void send(jakarta.jms.Message jmsMessage, Message requestMessage) { } } - private Message sendAndReceiveMessage(jakarta.jms.Message jmsMessage, Message requestMessage) { + private @Nullable Message sendAndReceiveMessage(jakarta.jms.Message jmsMessage, Message requestMessage) { try { if (this.retryTemplate == null) { setAttributesIfNecessary(jmsMessage, requestMessage); @@ -644,7 +648,9 @@ private Message sendAndReceiveMessage(jakarta.jms.Message jmsMessage, Message else { return this.retryTemplate.execute( context -> { - StaticMessageHeaderAccessor.getDeliveryAttempt(requestMessage).incrementAndGet(); + var deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt(requestMessage); + Assert.notNull(deliveryAttempt, "deliveryAttempt must not be null"); + deliveryAttempt.incrementAndGet(); setAttributesIfNecessary(jmsMessage, requestMessage); return sendAndReceiveMessage(requestMessage); }, this.recoveryCallback); diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/DynamicJmsTemplate.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/DynamicJmsTemplate.java index c025dfc001..11e2fe34e7 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/DynamicJmsTemplate.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/DynamicJmsTemplate.java @@ -17,6 +17,7 @@ package org.springframework.integration.jms; import jakarta.jms.ConnectionFactory; +import org.jspecify.annotations.Nullable; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; @@ -51,7 +52,7 @@ public void setReceiveTimeout(long receiveTimeout) { } @Override - public void setConnectionFactory(ConnectionFactory connectionFactory) { + public void setConnectionFactory(@Nullable ConnectionFactory connectionFactory) { super.setConnectionFactory(connectionFactory); if (!this.receiveTimeoutExplicitlySet) { if (connectionFactory instanceof CachingConnectionFactory cachingConnectionFactory && diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/DynamicJmsTemplateProperties.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/DynamicJmsTemplateProperties.java index 594811a96d..0e543bfe94 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/DynamicJmsTemplateProperties.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/DynamicJmsTemplateProperties.java @@ -16,6 +16,8 @@ package org.springframework.integration.jms; +import org.jspecify.annotations.Nullable; + /** * @author Mark Fisher * @author Artem Bilan @@ -24,22 +26,22 @@ */ abstract class DynamicJmsTemplateProperties { - private static final ThreadLocal PRIORITY_HOLDER = new ThreadLocal<>(); + private static final ThreadLocal<@Nullable Integer> PRIORITY_HOLDER = new ThreadLocal<>(); - private static final ThreadLocal RECEIVE_TIMEOUT_HOLDER = new ThreadLocal<>(); + private static final ThreadLocal<@Nullable Long> RECEIVE_TIMEOUT_HOLDER = new ThreadLocal<>(); - private static final ThreadLocal DELIVER_MODE_HOLDER = new ThreadLocal<>(); + private static final ThreadLocal<@Nullable Integer> DELIVER_MODE_HOLDER = new ThreadLocal<>(); - private static final ThreadLocal TIME_TO_LIVE_HOLDER = new ThreadLocal<>(); + private static final ThreadLocal<@Nullable Long> TIME_TO_LIVE_HOLDER = new ThreadLocal<>(); private DynamicJmsTemplateProperties() { } - public static Integer getPriority() { + public static @Nullable Integer getPriority() { return PRIORITY_HOLDER.get(); } - public static void setPriority(Integer priority) { + public static void setPriority(@Nullable Integer priority) { PRIORITY_HOLDER.set(priority); } @@ -47,7 +49,7 @@ public static void clearPriority() { PRIORITY_HOLDER.remove(); } - public static Long getReceiveTimeout() { + public static @Nullable Long getReceiveTimeout() { return RECEIVE_TIMEOUT_HOLDER.get(); } @@ -59,7 +61,7 @@ public static void clearReceiveTimeout() { RECEIVE_TIMEOUT_HOLDER.remove(); } - public static Integer getDeliveryMode() { + public static @Nullable Integer getDeliveryMode() { return DELIVER_MODE_HOLDER.get(); } @@ -71,7 +73,7 @@ public static void clearDeliveryMode() { DELIVER_MODE_HOLDER.remove(); } - public static Long getTimeToLive() { + public static @Nullable Long getTimeToLive() { return TIME_TO_LIVE_HOLDER.get(); } diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsDestinationPollingSource.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsDestinationPollingSource.java index 38433e4dec..3aa141f8b7 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsDestinationPollingSource.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsDestinationPollingSource.java @@ -19,6 +19,7 @@ import java.util.Map; import jakarta.jms.Destination; +import org.jspecify.annotations.Nullable; import org.springframework.integration.endpoint.AbstractMessageSource; import org.springframework.integration.jms.util.JmsAdapterUtils; @@ -43,15 +44,15 @@ public class JmsDestinationPollingSource extends AbstractMessageSource { private final JmsTemplate jmsTemplate; - private volatile Destination destination; + private volatile @Nullable Destination destination; - private volatile String destinationName; + private volatile @Nullable String destinationName; - private volatile String messageSelector; + private volatile @Nullable String messageSelector; private volatile JmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper(); - private volatile String sessionAcknowledgeMode; + private volatile @Nullable String sessionAcknowledgeMode; private volatile boolean extractPayload = true; @@ -121,7 +122,7 @@ protected void onInit() { * {@link JmsHeaderMapper} instance to map JMS properties to the MessageHeaders. */ @Override - protected Object doReceive() { + protected @Nullable Object doReceive() { jakarta.jms.Message jmsMessage = doReceiveJmsMessage(); if (jmsMessage == null) { return null; @@ -147,7 +148,7 @@ protected Object doReceive() { } } - private jakarta.jms.Message doReceiveJmsMessage() { + private jakarta.jms.@Nullable Message doReceiveJmsMessage() { jakarta.jms.Message jmsMessage; if (this.destination != null) { jmsMessage = this.jmsTemplate.receiveSelected(this.destination, this.messageSelector); diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java index 21e9692867..ff4df56411 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java @@ -16,7 +16,10 @@ package org.springframework.integration.jms; +import java.util.Objects; + import io.micrometer.observation.ObservationRegistry; +import org.jspecify.annotations.Nullable; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; @@ -132,7 +135,7 @@ public void registerObservationRegistry(ObservationRegistry observationRegistry) } @Override - public void setObservationConvention(MessageRequestReplyReceiverObservationConvention observationConvention) { + public void setObservationConvention(@Nullable MessageRequestReplyReceiverObservationConvention observationConvention) { super.setObservationConvention(observationConvention); this.endpoint.getListener().setRequestReplyObservationConvention(observationConvention); } @@ -156,7 +159,7 @@ public void setBeanFactory(BeanFactory beanFactory) { @Override protected void onInit() { - this.endpoint.setComponentName(getComponentName()); + this.endpoint.setComponentName(Objects.requireNonNull(getComponentName())); this.endpoint.afterPropertiesSet(); } diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsMessageDrivenEndpoint.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsMessageDrivenEndpoint.java index 94962d5195..c87938a5be 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsMessageDrivenEndpoint.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsMessageDrivenEndpoint.java @@ -16,7 +16,10 @@ package org.springframework.integration.jms; +import java.util.Objects; + import io.micrometer.observation.ObservationRegistry; +import org.jspecify.annotations.Nullable; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; @@ -48,7 +51,7 @@ public class JmsMessageDrivenEndpoint extends MessageProducerSupport implements private final ChannelPublishingJmsMessageListener listener; - private String sessionAcknowledgeMode; + private @Nullable String sessionAcknowledgeMode; private boolean shutdownContainerOnStop = true; @@ -175,7 +178,7 @@ public void registerObservationRegistry(ObservationRegistry observationRegistry) } @Override - public void setObservationConvention(MessageReceiverObservationConvention observationConvention) { + public void setObservationConvention(@Nullable MessageReceiverObservationConvention observationConvention) { super.setObservationConvention(observationConvention); this.listener.setReceiverObservationConvention(observationConvention); } @@ -218,7 +221,7 @@ protected void onInit() { this.listenerContainer.setSessionAcknowledgeMode(acknowledgeMode); } } - this.listener.setComponentName(getComponentName()); + this.listener.setComponentName(Objects.requireNonNull(getComponentName())); } @Override diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsOutboundGateway.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsOutboundGateway.java index 8090e0f8ba..72cf4213c7 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsOutboundGateway.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsOutboundGateway.java @@ -46,6 +46,7 @@ import jakarta.jms.Topic; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.jspecify.annotations.Nullable; import org.springframework.beans.factory.BeanFactory; import org.springframework.core.convert.ConversionService; @@ -110,17 +111,17 @@ public class JmsOutboundGateway extends AbstractReplyProducingMessageHandler private final Lock lifeCycleMonitor = new ReentrantLock(); - private Destination requestDestination; + private @Nullable Destination requestDestination; - private String requestDestinationName; + private @Nullable String requestDestinationName; - private ExpressionEvaluatingMessageProcessor requestDestinationExpressionProcessor; + private @Nullable ExpressionEvaluatingMessageProcessor requestDestinationExpressionProcessor; - private Destination replyDestination; + private @Nullable Destination replyDestination; - private String replyDestinationName; + private @Nullable String replyDestinationName; - private ExpressionEvaluatingMessageProcessor replyDestinationExpressionProcessor; + private @Nullable ExpressionEvaluatingMessageProcessor replyDestinationExpressionProcessor; private DestinationResolver destinationResolver = new DynamicDestinationResolver(); @@ -138,21 +139,22 @@ public class JmsOutboundGateway extends AbstractReplyProducingMessageHandler private boolean explicitQosEnabled; + @SuppressWarnings("NullAway.Init") private ConnectionFactory connectionFactory; private MessageConverter messageConverter = new SimpleMessageConverter(); private JmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper(); - private String correlationKey; + private @Nullable String correlationKey; private boolean extractRequestPayload = true; private boolean extractReplyPayload = true; - private GatewayReplyListenerContainer replyContainer; + private @Nullable GatewayReplyListenerContainer replyContainer; - private ReplyContainerProperties replyContainerProperties; + private @Nullable ReplyContainerProperties replyContainerProperties; private boolean useReplyContainer; @@ -164,11 +166,11 @@ public class JmsOutboundGateway extends AbstractReplyProducingMessageHandler private volatile boolean initialized; - private volatile ScheduledFuture reaper; + private volatile @Nullable ScheduledFuture reaper; private volatile boolean wasStopped; - private volatile ScheduledFuture idleTask; + private volatile @Nullable ScheduledFuture idleTask; private volatile long lastSend; @@ -638,8 +640,7 @@ private void applyReplyContainerProperties(GatewayReplyListenerContainer contain container::setRecoveryInterval) .acceptIfHasText(this.replyContainerProperties.getSessionAcknowledgeModeName(), acknowledgeModeName -> { - Integer acknowledgeMode = JmsAdapterUtils.parseAcknowledgeMode( - this.replyContainerProperties.getSessionAcknowledgeModeName()); + Integer acknowledgeMode = JmsAdapterUtils.parseAcknowledgeMode(acknowledgeModeName); if (acknowledgeMode != null) { if (JmsAdapterUtils.SESSION_TRANSACTED == acknowledgeMode) { container.setSessionTransacted(true); @@ -732,7 +733,7 @@ public boolean isRunning() { } @Override - protected Object handleRequestMessage(final Message requestMessage) { + protected @Nullable Object handleRequestMessage(final Message requestMessage) { if (!this.initialized) { afterPropertiesSet(); } @@ -751,7 +752,7 @@ protected Object handleRequestMessage(final Message requestMessage) { this.replyContainer.start(); this.idleTask = getTaskScheduler() - .scheduleAtFixedRate(new IdleContainerStopper(), + .scheduleAtFixedRate(new IdleContainerStopper(this.replyContainer), Duration.ofMillis(this.idleReplyContainerTimeout / 2)); } } @@ -807,7 +808,8 @@ private AbstractIntegrationMessageBuilder buildReply(jakarta.jms.Message jmsR } } - private Object sendAndReceiveWithContainer(Message requestMessage) throws JMSException { + @SuppressWarnings("NullAway") // Dataflow analysis limitation + private @Nullable Object sendAndReceiveWithContainer(Message requestMessage) throws JMSException { Connection connection = createConnection(); // NOSONAR - closed in ConnectionFactoryUtils. Session session = null; Destination replyTo = this.replyContainer.getReplyDestination(); @@ -861,7 +863,7 @@ private Object sendAndReceiveWithContainer(Message requestMessage) throws JMS } } - private jakarta.jms.Message sendAndReceiveWithoutContainer(Message requestMessage) throws JMSException { + private jakarta.jms.@Nullable Message sendAndReceiveWithoutContainer(Message requestMessage) throws JMSException { Connection connection = createConnection(); // NOSONAR - closed in ConnectionFactoryUtils. Session session = null; Destination replyTo = null; @@ -915,7 +917,7 @@ else if (replyTo instanceof TemporaryQueue || replyTo instanceof TemporaryTopic) * Creates the MessageConsumer before sending the request Message since we are generating * our own correlationId value for the MessageSelector. */ - private jakarta.jms.Message doSendAndReceiveWithGeneratedCorrelationId(Destination reqDestination, + private jakarta.jms.@Nullable Message doSendAndReceiveWithGeneratedCorrelationId(Destination reqDestination, jakarta.jms.Message jmsRequest, Destination replyTo, Session session, int priority) throws JMSException { MessageProducer messageProducer = null; try { @@ -949,7 +951,7 @@ private jakarta.jms.Message doSendAndReceiveWithGeneratedCorrelationId(Destinati /** * Creates the MessageConsumer before sending the request Message since we do not need any correlation. */ - private jakarta.jms.Message doSendAndReceiveWithTemporaryReplyToDestination(Destination reqDestination, + private jakarta.jms.@Nullable Message doSendAndReceiveWithTemporaryReplyToDestination(Destination reqDestination, jakarta.jms.Message jmsRequest, Destination replyTo, Session session, int priority) throws JMSException { MessageProducer messageProducer = null; @@ -970,7 +972,7 @@ private jakarta.jms.Message doSendAndReceiveWithTemporaryReplyToDestination(Dest * Creates the MessageConsumer after sending the request Message since we need * the MessageID for correlation with a MessageSelector. */ - private jakarta.jms.Message doSendAndReceiveWithMessageIdCorrelation(Destination reqDestination, + private jakarta.jms.@Nullable Message doSendAndReceiveWithMessageIdCorrelation(Destination reqDestination, jakarta.jms.Message jmsRequest, Destination replyTo, Session session, int priority) throws JMSException { if (replyTo instanceof Topic) { @@ -1007,7 +1009,7 @@ private jakarta.jms.Message doSendAndReceiveWithMessageIdCorrelation(Destination * If the replyTo is not temporary, and the connection is lost while waiting for a reply, reconnect for * up to receiveTimeout. */ - private jakarta.jms.Message retryableReceiveReply(Session session, Destination replyTo, // NOSONAR + private jakarta.jms.@Nullable Message retryableReceiveReply(Session session, Destination replyTo, // NOSONAR String messageSelector) throws JMSException { Connection consumerConnection = null; //NOSONAR @@ -1071,7 +1073,8 @@ private jakarta.jms.Message retryableReceiveReply(Session session, Destination r } } - private Object doSendAndReceiveAsync(Destination reqDestination, jakarta.jms.Message jmsRequest, Session session, + @SuppressWarnings("NullAway") // Dataflow analysis limitation + private @Nullable Object doSendAndReceiveAsync(Destination reqDestination, jakarta.jms.Message jmsRequest, Session session, int priority) throws JMSException { String correlation = null; @@ -1120,7 +1123,7 @@ private Object doSendAndReceiveAsync(Destination reqDestination, jakarta.jms.Mes } } - private jakarta.jms.Message doSendAndReceiveAsyncDefaultCorrelation(Destination reqDestination, + private jakarta.jms.@Nullable Message doSendAndReceiveAsyncDefaultCorrelation(Destination reqDestination, jakarta.jms.Message jmsRequest, Session session, int priority) throws JMSException { String correlation = null; @@ -1162,7 +1165,7 @@ private jakarta.jms.Message doSendAndReceiveAsyncDefaultCorrelation(Destination } } - private jakarta.jms.Message obtainReplyFromContainer(String correlationId, + private jakarta.jms.@Nullable Message obtainReplyFromContainer(String correlationId, LinkedBlockingQueue replyQueue) { jakarta.jms.Message reply = null; @@ -1228,7 +1231,7 @@ private void sendRequestMessage(jakarta.jms.Message jmsRequest, MessageProducer } } - private jakarta.jms.Message receiveReplyMessage(MessageConsumer messageConsumer) throws JMSException { + private jakarta.jms.@Nullable Message receiveReplyMessage(MessageConsumer messageConsumer) throws JMSException { return (this.receiveTimeout >= 0) ? messageConsumer.receive(this.receiveTimeout) : messageConsumer.receive(); } @@ -1237,7 +1240,7 @@ private jakarta.jms.Message receiveReplyMessage(MessageConsumer messageConsumer) * Ignores any other {@link Destination} type and also ignores any * {@link JMSException}s that may be thrown when attempting to delete. */ - private void deleteDestinationIfTemporary(Destination destination) { + private void deleteDestinationIfTemporary(@Nullable Destination destination) { try { if (destination instanceof TemporaryQueue temporaryQueue) { temporaryQueue.delete(); @@ -1345,7 +1348,7 @@ private void onMessageSync(jakarta.jms.Message message, String correlationId) { private static class GatewayReplyListenerContainer extends DefaultMessageListenerContainer { - private volatile Destination replyDestination; + private volatile @Nullable Destination replyDestination; GatewayReplyListenerContainer() { } @@ -1491,7 +1494,10 @@ public void run() { private class IdleContainerStopper implements Runnable { - IdleContainerStopper() { + private final GatewayReplyListenerContainer replyContainer; + + IdleContainerStopper(GatewayReplyListenerContainer replyContainer) { + this.replyContainer = replyContainer; } @Override @@ -1500,12 +1506,15 @@ public void run() { try { if (System.currentTimeMillis() - JmsOutboundGateway.this.lastSend > JmsOutboundGateway.this.idleReplyContainerTimeout - && JmsOutboundGateway.this.replies.size() == 0 && - JmsOutboundGateway.this.replyContainer.isRunning()) { + && JmsOutboundGateway.this.replies.isEmpty() && + this.replyContainer.isRunning()) { logger.debug(() -> getComponentName() + ": Stopping idle reply container."); - JmsOutboundGateway.this.replyContainer.stop(); - JmsOutboundGateway.this.idleTask.cancel(false); + this.replyContainer.stop(); + ScheduledFuture idleTask = JmsOutboundGateway.this.idleTask; + if (idleTask != null) { + idleTask.cancel(false); + } JmsOutboundGateway.this.idleTask = null; } } @@ -1518,31 +1527,31 @@ public void run() { public static class ReplyContainerProperties { - private Boolean sessionTransacted; + private @Nullable Boolean sessionTransacted; - private Integer sessionAcknowledgeMode; + private @Nullable Integer sessionAcknowledgeMode; - private String sessionAcknowledgeModeName; + private @Nullable String sessionAcknowledgeModeName; - private Long receiveTimeout; + private @Nullable Long receiveTimeout; - private Long recoveryInterval; + private @Nullable Long recoveryInterval; - private Integer cacheLevel; + private @Nullable Integer cacheLevel; - private Integer concurrentConsumers; + private @Nullable Integer concurrentConsumers; - private Integer maxConcurrentConsumers; + private @Nullable Integer maxConcurrentConsumers; - private Integer maxMessagesPerTask; + private @Nullable Integer maxMessagesPerTask; - private Integer idleConsumerLimit; + private @Nullable Integer idleConsumerLimit; - private Integer idleTaskExecutionLimit; + private @Nullable Integer idleTaskExecutionLimit; - private Executor taskExecutor; + private @Nullable Executor taskExecutor; - public String getSessionAcknowledgeModeName() { + public @Nullable String getSessionAcknowledgeModeName() { return this.sessionAcknowledgeModeName; } @@ -1550,7 +1559,7 @@ public void setSessionAcknowledgeModeName(String sessionAcknowledgeModeName) { this.sessionAcknowledgeModeName = sessionAcknowledgeModeName; } - public Boolean isSessionTransacted() { + public @Nullable Boolean isSessionTransacted() { return this.sessionTransacted; } @@ -1558,7 +1567,7 @@ public void setSessionTransacted(Boolean sessionTransacted) { this.sessionTransacted = sessionTransacted; } - public Integer getSessionAcknowledgeMode() { + public @Nullable Integer getSessionAcknowledgeMode() { return this.sessionAcknowledgeMode; } @@ -1566,7 +1575,7 @@ public void setSessionAcknowledgeMode(Integer sessionAcknowledgeMode) { this.sessionAcknowledgeMode = sessionAcknowledgeMode; } - public Long getReceiveTimeout() { + public @Nullable Long getReceiveTimeout() { return this.receiveTimeout; } @@ -1574,7 +1583,7 @@ public void setReceiveTimeout(Long receiveTimeout) { this.receiveTimeout = receiveTimeout; } - public Long getRecoveryInterval() { + public @Nullable Long getRecoveryInterval() { return this.recoveryInterval; } @@ -1582,7 +1591,7 @@ public void setRecoveryInterval(Long recoveryInterval) { this.recoveryInterval = recoveryInterval; } - public Integer getCacheLevel() { + public @Nullable Integer getCacheLevel() { return this.cacheLevel; } @@ -1590,7 +1599,7 @@ public void setCacheLevel(Integer cacheLevel) { this.cacheLevel = cacheLevel; } - public Integer getConcurrentConsumers() { + public @Nullable Integer getConcurrentConsumers() { return this.concurrentConsumers; } @@ -1598,7 +1607,7 @@ public void setConcurrentConsumers(Integer concurrentConsumers) { this.concurrentConsumers = concurrentConsumers; } - public Integer getMaxConcurrentConsumers() { + public @Nullable Integer getMaxConcurrentConsumers() { return this.maxConcurrentConsumers; } @@ -1606,7 +1615,7 @@ public void setMaxConcurrentConsumers(Integer maxConcurrentConsumers) { this.maxConcurrentConsumers = maxConcurrentConsumers; } - public Integer getMaxMessagesPerTask() { + public @Nullable Integer getMaxMessagesPerTask() { return this.maxMessagesPerTask; } @@ -1614,7 +1623,7 @@ public void setMaxMessagesPerTask(Integer maxMessagesPerTask) { this.maxMessagesPerTask = maxMessagesPerTask; } - public Integer getIdleConsumerLimit() { + public @Nullable Integer getIdleConsumerLimit() { return this.idleConsumerLimit; } @@ -1622,7 +1631,7 @@ public void setIdleConsumerLimit(Integer idleConsumerLimit) { this.idleConsumerLimit = idleConsumerLimit; } - public Integer getIdleTaskExecutionLimit() { + public @Nullable Integer getIdleTaskExecutionLimit() { return this.idleTaskExecutionLimit; } @@ -1634,7 +1643,7 @@ public void setTaskExecutor(Executor taskExecutor) { this.taskExecutor = taskExecutor; } - public Executor getTaskExecutor() { + public @Nullable Executor getTaskExecutor() { return this.taskExecutor; } diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsSendingMessageHandler.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsSendingMessageHandler.java index 216bf91f97..a9f68efc1c 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsSendingMessageHandler.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsSendingMessageHandler.java @@ -17,6 +17,7 @@ package org.springframework.integration.jms; import jakarta.jms.Destination; +import org.jspecify.annotations.Nullable; import org.springframework.beans.factory.BeanFactory; import org.springframework.core.convert.ConversionService; @@ -43,20 +44,21 @@ public class JmsSendingMessageHandler extends AbstractMessageHandler { private final JmsTemplate jmsTemplate; - private Destination destination; + private @Nullable Destination destination; - private String destinationName; + private @Nullable String destinationName; private JmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper(); private boolean extractPayload = true; - private ExpressionEvaluatingMessageProcessor destinationExpressionProcessor; + private @Nullable ExpressionEvaluatingMessageProcessor destinationExpressionProcessor; - private Expression deliveryModeExpression; + private @Nullable Expression deliveryModeExpression; - private Expression timeToLiveExpression; + private @Nullable Expression timeToLiveExpression; + @SuppressWarnings("NullAway.Init") private EvaluationContext evaluationContext; public JmsSendingMessageHandler(JmsTemplate jmsTemplate) { @@ -192,7 +194,7 @@ protected void handleMessageInternal(final Message message) { } } - private Object determineDestination(Message message) { + private @Nullable Object determineDestination(Message message) { if (this.destination != null) { return this.destination; } @@ -211,7 +213,7 @@ private Object determineDestination(Message message) { return null; } - private void send(Object destination, Object objectToSend, MessagePostProcessor messagePostProcessor) { + private void send(@Nullable Object destination, Object objectToSend, MessagePostProcessor messagePostProcessor) { if (destination instanceof Destination destinationObj) { this.jmsTemplate.convertAndSend(destinationObj, objectToSend, messagePostProcessor); } diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/PollableJmsChannel.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/PollableJmsChannel.java index a7b8bc5cd3..f93fc09c89 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/PollableJmsChannel.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/PollableJmsChannel.java @@ -20,11 +20,12 @@ import java.util.Deque; import java.util.List; +import org.jspecify.annotations.Nullable; + import org.springframework.integration.channel.ExecutorChannelInterceptorAware; import org.springframework.integration.support.management.metrics.CounterFacade; import org.springframework.integration.support.management.metrics.MetricsCaptor; import org.springframework.jms.core.JmsTemplate; -import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.ChannelInterceptor; @@ -44,9 +45,9 @@ public class PollableJmsChannel extends AbstractJmsChannel implements PollableChannel, ExecutorChannelInterceptorAware { - private String messageSelector; + private @Nullable String messageSelector; - private CounterFacade receiveCounter; + private @Nullable CounterFacade receiveCounter; private volatile int executorInterceptorsSize; @@ -195,7 +196,6 @@ public boolean removeInterceptor(ChannelInterceptor interceptor) { } @Override - @Nullable public ChannelInterceptor removeInterceptor(int index) { ChannelInterceptor interceptor = super.removeInterceptor(index); if (interceptor instanceof ExecutorChannelInterceptor) { diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/SubscribableJmsChannel.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/SubscribableJmsChannel.java index 34f9c9bd34..baacdd1266 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/SubscribableJmsChannel.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/SubscribableJmsChannel.java @@ -54,10 +54,12 @@ public class SubscribableJmsChannel extends AbstractJmsChannel private final AbstractMessageListenerContainer container; + @SuppressWarnings("NullAway.Init") private volatile AbstractDispatcher dispatcher; private volatile boolean initialized; + @SuppressWarnings("NullAway.Init") private volatile Integer maxSubscribers; public SubscribableJmsChannel(AbstractMessageListenerContainer container, JmsTemplate jmsTemplate) { @@ -207,6 +209,7 @@ private static final class DispatchingMessageListener implements MessageListener this.messageBuilderFactory = messageBuilderFactory; } + @SuppressWarnings("NullAway") // Dataflow analysis limitation @Override public void onMessage(jakarta.jms.Message message) { Message messageToSend = null; diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/config/JmsChannelFactoryBean.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/config/JmsChannelFactoryBean.java index e114fe647f..e865f192df 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/config/JmsChannelFactoryBean.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/config/JmsChannelFactoryBean.java @@ -23,6 +23,7 @@ import jakarta.jms.Destination; import jakarta.jms.ExceptionListener; import jakarta.jms.Session; +import org.jspecify.annotations.Nullable; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.BeanFactory; @@ -60,64 +61,66 @@ public class JmsChannelFactoryBean extends AbstractFactoryBean implements SmartLifecycle, BeanNameAware { + @SuppressWarnings("NullAway.Init") private volatile AbstractJmsChannel channel; - private volatile List interceptors; + private volatile @Nullable List interceptors; private final boolean messageDriven; private final JmsTemplate jmsTemplate = new DynamicJmsTemplate(); - private Class containerType; + private @Nullable Class containerType; private boolean acceptMessagesWhileStopping; private boolean autoStartup = true; - private String cacheLevelName; + private @Nullable String cacheLevelName; - private Integer cacheLevel; + private @Nullable Integer cacheLevel; - private String clientId; + private @Nullable String clientId; - private String concurrency; + private @Nullable String concurrency; - private Integer concurrentConsumers; + private @Nullable Integer concurrentConsumers; - private ConnectionFactory connectionFactory; + private @Nullable ConnectionFactory connectionFactory; - private Destination destination; + private @Nullable Destination destination; - private String destinationName; + private @Nullable String destinationName; - private DestinationResolver destinationResolver; + private @Nullable DestinationResolver destinationResolver; - private String durableSubscriptionName; + private @Nullable String durableSubscriptionName; - private ErrorHandler errorHandler; + private @Nullable ErrorHandler errorHandler; - private ExceptionListener exceptionListener; + private @Nullable ExceptionListener exceptionListener; - private Boolean exposeListenerSession; + private @Nullable Boolean exposeListenerSession; - private Integer idleTaskExecutionLimit; + private @Nullable Integer idleTaskExecutionLimit; - private Integer maxConcurrentConsumers; + private @Nullable Integer maxConcurrentConsumers; - private Integer maxMessagesPerTask; + private @Nullable Integer maxMessagesPerTask; - private String messageSelector; + private @Nullable String messageSelector; - private Integer phase; + private @Nullable Integer phase; - private Boolean pubSubDomain; + private @Nullable Boolean pubSubDomain; private boolean pubSubNoLocal; - private Long receiveTimeout; + private @Nullable Long receiveTimeout; - private Long recoveryInterval; + private @Nullable Long recoveryInterval; + @SuppressWarnings("NullAway.Init") private String beanName; private boolean subscriptionShared; @@ -134,13 +137,13 @@ public class JmsChannelFactoryBean extends AbstractFactoryBean