Skip to content

Apply Nullability to JMS module #10194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import jakarta.jms.JMSException;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import org.jspecify.annotations.Nullable;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
Expand Down Expand Up @@ -56,7 +57,6 @@
import org.springframework.jms.support.converter.SimpleMessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.jms.support.destination.DynamicDestinationResolver;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
Expand Down Expand Up @@ -96,9 +96,9 @@ public class ChannelPublishingJmsMessageListener

private boolean extractReplyPayload = true;

private Object defaultReplyDestination;
private @Nullable Object defaultReplyDestination;

private String correlationKey;
private @Nullable String correlationKey;

private long replyTimeToLive = jakarta.jms.Message.DEFAULT_TIME_TO_LIVE;

Expand All @@ -110,14 +110,16 @@ public class ChannelPublishingJmsMessageListener

private DestinationResolver destinationResolver = new DynamicDestinationResolver();

private Expression replyToExpression;
private @Nullable Expression replyToExpression;

private JmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper();

@SuppressWarnings("NullAway.Init")
private BeanFactory beanFactory;

private MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();

@SuppressWarnings("NullAway.Init")
private StandardEvaluationContext evaluationContext;

/**
Expand Down Expand Up @@ -174,7 +176,7 @@ public void setShouldTrack(boolean shouldTrack) {
}

@Override
public String getComponentName() {
public @Nullable String getComponentName() {
return this.gatewayDelegate.getComponentName();
}

Expand Down Expand Up @@ -563,7 +565,7 @@ else if (replyToValue instanceof String destinationName) {
* @see #setDefaultReplyTopicName
* @see #setDestinationResolver
*/
private Destination resolveDefaultReplyDestination(Session session) throws JMSException {
private @Nullable Destination resolveDefaultReplyDestination(Session session) throws JMSException {
if (this.defaultReplyDestination instanceof Destination destination) {
return destination;
}
Expand Down Expand Up @@ -600,7 +602,7 @@ private record DestinationNameHolder(String name, boolean isTopic) {

private class GatewayDelegate extends MessagingGatewaySupport {

private static final ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();
private static final ThreadLocal<@Nullable AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();

@Nullable
private RetryOperations retryTemplate;
Expand All @@ -621,7 +623,9 @@ private void send(jakarta.jms.Message jmsMessage, Message<?> requestMessage) {
else {
this.retryTemplate.execute(
context -> {
StaticMessageHeaderAccessor.getDeliveryAttempt(requestMessage).incrementAndGet();
var deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt(requestMessage);
Assert.notNull(deliveryAttempt, "deliveryAttempt must not be null");
deliveryAttempt.incrementAndGet();
setAttributesIfNecessary(jmsMessage, requestMessage);
send(requestMessage);
return null;
Expand All @@ -635,7 +639,7 @@ private void send(jakarta.jms.Message jmsMessage, Message<?> requestMessage) {
}
}

private Message<?> sendAndReceiveMessage(jakarta.jms.Message jmsMessage, Message<?> requestMessage) {
private @Nullable Message<?> sendAndReceiveMessage(jakarta.jms.Message jmsMessage, Message<?> requestMessage) {
try {
if (this.retryTemplate == null) {
setAttributesIfNecessary(jmsMessage, requestMessage);
Expand All @@ -644,7 +648,9 @@ private Message<?> sendAndReceiveMessage(jakarta.jms.Message jmsMessage, Message
else {
return this.retryTemplate.execute(
context -> {
StaticMessageHeaderAccessor.getDeliveryAttempt(requestMessage).incrementAndGet();
var deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt(requestMessage);
Assert.notNull(deliveryAttempt, "deliveryAttempt must not be null");
deliveryAttempt.incrementAndGet();
setAttributesIfNecessary(jmsMessage, requestMessage);
return sendAndReceiveMessage(requestMessage);
}, this.recoveryCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.integration.jms;

import jakarta.jms.ConnectionFactory;
import org.jspecify.annotations.Nullable;

import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
Expand Down Expand Up @@ -51,7 +52,7 @@ public void setReceiveTimeout(long receiveTimeout) {
}

@Override
public void setConnectionFactory(ConnectionFactory connectionFactory) {
public void setConnectionFactory(@Nullable ConnectionFactory connectionFactory) {
super.setConnectionFactory(connectionFactory);
if (!this.receiveTimeoutExplicitlySet) {
if (connectionFactory instanceof CachingConnectionFactory cachingConnectionFactory &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.integration.jms;

import org.jspecify.annotations.Nullable;

/**
* @author Mark Fisher
* @author Artem Bilan
Expand All @@ -24,30 +26,30 @@
*/
abstract class DynamicJmsTemplateProperties {

private static final ThreadLocal<Integer> PRIORITY_HOLDER = new ThreadLocal<>();
private static final ThreadLocal<@Nullable Integer> PRIORITY_HOLDER = new ThreadLocal<>();

private static final ThreadLocal<Long> RECEIVE_TIMEOUT_HOLDER = new ThreadLocal<>();
private static final ThreadLocal<@Nullable Long> RECEIVE_TIMEOUT_HOLDER = new ThreadLocal<>();

private static final ThreadLocal<Integer> DELIVER_MODE_HOLDER = new ThreadLocal<>();
private static final ThreadLocal<@Nullable Integer> DELIVER_MODE_HOLDER = new ThreadLocal<>();

private static final ThreadLocal<Long> TIME_TO_LIVE_HOLDER = new ThreadLocal<>();
private static final ThreadLocal<@Nullable Long> TIME_TO_LIVE_HOLDER = new ThreadLocal<>();

private DynamicJmsTemplateProperties() {
}

public static Integer getPriority() {
public static @Nullable Integer getPriority() {
return PRIORITY_HOLDER.get();
}

public static void setPriority(Integer priority) {
public static void setPriority(@Nullable Integer priority) {
PRIORITY_HOLDER.set(priority);
}

public static void clearPriority() {
PRIORITY_HOLDER.remove();
}

public static Long getReceiveTimeout() {
public static @Nullable Long getReceiveTimeout() {
return RECEIVE_TIMEOUT_HOLDER.get();
}

Expand All @@ -59,7 +61,7 @@ public static void clearReceiveTimeout() {
RECEIVE_TIMEOUT_HOLDER.remove();
}

public static Integer getDeliveryMode() {
public static @Nullable Integer getDeliveryMode() {
return DELIVER_MODE_HOLDER.get();
}

Expand All @@ -71,7 +73,7 @@ public static void clearDeliveryMode() {
DELIVER_MODE_HOLDER.remove();
}

public static Long getTimeToLive() {
public static @Nullable Long getTimeToLive() {
return TIME_TO_LIVE_HOLDER.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;

import jakarta.jms.Destination;
import org.jspecify.annotations.Nullable;

import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.jms.util.JmsAdapterUtils;
Expand All @@ -43,15 +44,15 @@ public class JmsDestinationPollingSource extends AbstractMessageSource<Object> {

private final JmsTemplate jmsTemplate;

private volatile Destination destination;
private volatile @Nullable Destination destination;

private volatile String destinationName;
private volatile @Nullable String destinationName;

private volatile String messageSelector;
private volatile @Nullable String messageSelector;

private volatile JmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper();

private volatile String sessionAcknowledgeMode;
private volatile @Nullable String sessionAcknowledgeMode;

private volatile boolean extractPayload = true;

Expand Down Expand Up @@ -121,7 +122,7 @@ protected void onInit() {
* {@link JmsHeaderMapper} instance to map JMS properties to the MessageHeaders.
*/
@Override
protected Object doReceive() {
protected @Nullable Object doReceive() {
jakarta.jms.Message jmsMessage = doReceiveJmsMessage();
if (jmsMessage == null) {
return null;
Expand All @@ -147,7 +148,7 @@ protected Object doReceive() {
}
}

private jakarta.jms.Message doReceiveJmsMessage() {
private jakarta.jms.@Nullable Message doReceiveJmsMessage() {
jakarta.jms.Message jmsMessage;
if (this.destination != null) {
jmsMessage = this.jmsTemplate.receiveSelected(this.destination, this.messageSelector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package org.springframework.integration.jms;

import java.util.Objects;

import io.micrometer.observation.ObservationRegistry;
import org.jspecify.annotations.Nullable;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
Expand Down Expand Up @@ -132,7 +135,7 @@ public void registerObservationRegistry(ObservationRegistry observationRegistry)
}

@Override
public void setObservationConvention(MessageRequestReplyReceiverObservationConvention observationConvention) {
public void setObservationConvention(@Nullable MessageRequestReplyReceiverObservationConvention observationConvention) {
super.setObservationConvention(observationConvention);
this.endpoint.getListener().setRequestReplyObservationConvention(observationConvention);
}
Expand All @@ -156,7 +159,7 @@ public void setBeanFactory(BeanFactory beanFactory) {

@Override
protected void onInit() {
this.endpoint.setComponentName(getComponentName());
this.endpoint.setComponentName(Objects.requireNonNull(getComponentName()));
this.endpoint.afterPropertiesSet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package org.springframework.integration.jms;

import java.util.Objects;

import io.micrometer.observation.ObservationRegistry;
import org.jspecify.annotations.Nullable;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -48,7 +51,7 @@ public class JmsMessageDrivenEndpoint extends MessageProducerSupport implements

private final ChannelPublishingJmsMessageListener listener;

private String sessionAcknowledgeMode;
private @Nullable String sessionAcknowledgeMode;

private boolean shutdownContainerOnStop = true;

Expand Down Expand Up @@ -175,7 +178,7 @@ public void registerObservationRegistry(ObservationRegistry observationRegistry)
}

@Override
public void setObservationConvention(MessageReceiverObservationConvention observationConvention) {
public void setObservationConvention(@Nullable MessageReceiverObservationConvention observationConvention) {
super.setObservationConvention(observationConvention);
this.listener.setReceiverObservationConvention(observationConvention);
}
Expand Down Expand Up @@ -218,7 +221,7 @@ protected void onInit() {
this.listenerContainer.setSessionAcknowledgeMode(acknowledgeMode);
}
}
this.listener.setComponentName(getComponentName());
this.listener.setComponentName(Objects.requireNonNull(getComponentName()));
}

@Override
Expand Down
Loading