Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrat
this.errorMessageStrategy = errorMessageStrategy;
}

public final void setChannel(MessageChannel channel) {
public final void setChannel(@Nullable MessageChannel channel) {
this.channel = channel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.jspecify.annotations.Nullable;

import org.springframework.integration.handler.MessageProcessor;
import org.springframework.integration.support.MutableMessageBuilder;
import org.springframework.messaging.Message;

/**
* The {@link org.springframework.integration.core.MessageSource} strategy implementation
Expand All @@ -27,11 +29,15 @@
*
* @author Artem Bilan
* @author Gary Russell
* @author Jiandong Ma
*
* @since 5.0
*/
public class MessageProcessorMessageSource extends AbstractMessageSource<Object> {

// provide a fake message since the contract of processMessage requires a NonNull Message.
static final Message<Object> FAKE_MESSAGE = MutableMessageBuilder.withPayload(new Object(), false).build();

private final MessageProcessor<?> messageProcessor;

public MessageProcessorMessageSource(MessageProcessor<?> messageProcessor) {
Expand All @@ -45,7 +51,7 @@ public String getComponentType() {

@Override
protected @Nullable Object doReceive() {
return this.messageProcessor.processMessage(null);
return this.messageProcessor.processMessage(FAKE_MESSAGE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public boolean isRunning() {
}

@Override
public Object postProcess(Message<?> message, Object result) {
public @Nullable Object postProcess(Message<?> message, @Nullable Object result) {
if (result == null) {
MessageChannel channelToDiscard = getDiscardChannel();
if (channelToDiscard != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void setObservationConvention(@Nullable MessageReceiverObservationConvent
this.observationConvention = observationConvention;
}

@Override // NOSONAR
@Override
public void handleMessage(Message<?> message) {
Assert.notNull(message, "Message must not be null");
if (isLoggingEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
public abstract class AbstractMessageProducingHandler extends AbstractMessageHandler
implements MessageProducer, HeaderPropagationAware {

protected final MessagingTemplate messagingTemplate = new MessagingTemplate(); // NOSONAR final
protected final MessagingTemplate messagingTemplate = new MessagingTemplate();

private boolean async;

Expand All @@ -88,7 +88,7 @@ public abstract class AbstractMessageProducingHandler extends AbstractMessageHan
@Nullable
private MessageChannel outputChannel;

private String[] notPropagatedHeaders;
private String @Nullable [] notPropagatedHeaders;

private boolean selectiveHeaderPropagation;

Expand All @@ -113,7 +113,7 @@ public void setOutputChannel(MessageChannel outputChannel) {
@Override
public void setOutputChannelName(String outputChannelName) {
Assert.hasText(outputChannelName, "'outputChannelName' must not be empty");
this.outputChannelName = outputChannelName; //NOSONAR (inconsistent sync)
this.outputChannelName = outputChannelName;
}

/**
Expand Down Expand Up @@ -213,12 +213,10 @@ public void addNotPropagatedHeaders(String... headers) {
@Override
protected void onInit() {
super.onInit();
Assert.state(!(this.outputChannelName != null && this.outputChannel != null), //NOSONAR (inconsistent sync)
Assert.state(!(this.outputChannelName != null && this.outputChannel != null),
"'outputChannelName' and 'outputChannel' are mutually exclusive.");
BeanFactory beanFactory = getBeanFactory();
if (beanFactory != null) {
this.messagingTemplate.setBeanFactory(beanFactory);
}
this.messagingTemplate.setBeanFactory(beanFactory);
this.messagingTemplate.setDestinationResolver(getChannelResolver());
setAsyncIfCan();
if (!this.sendTimeoutSet) {
Expand All @@ -244,9 +242,9 @@ public MessageChannel getOutputChannel() {
return this.outputChannel;
}

protected void sendOutputs(Object result, Message<?> requestMessage) {
if (result instanceof Iterable<?> && shouldSplitOutput((Iterable<?>) result)) {
for (Object o : (Iterable<?>) result) {
protected void sendOutputs(@Nullable Object result, Message<?> requestMessage) {
if (result instanceof Iterable<?> iterableResult && shouldSplitOutput(iterableResult)) {
for (Object o : iterableResult) {
produceOutput(o, requestMessage);
}
}
Expand Down Expand Up @@ -294,13 +292,12 @@ protected void produceOutput(Object replyArg, final Message<?> requestMessage) {
private Map<?, ?> obtainRoutingSlipHeader(MessageHeaders requestHeaders, Object reply) {
Map<?, ?> routingSlipHeader = requestHeaders.get(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
if (routingSlipHeader == null) {
if (reply instanceof Message) {
routingSlipHeader = ((Message<?>) reply).getHeaders()
if (reply instanceof Message<?> replyMessage) {
routingSlipHeader = replyMessage.getHeaders()
.get(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
}
else if (reply instanceof AbstractIntegrationMessageBuilder<?>) {
routingSlipHeader = ((AbstractIntegrationMessageBuilder<?>) reply)
.getHeader(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
else if (reply instanceof AbstractIntegrationMessageBuilder<?> messageBuilder) {
routingSlipHeader = messageBuilder.getHeader(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
}
}
return routingSlipHeader;
Expand All @@ -310,12 +307,11 @@ else if (reply instanceof AbstractIntegrationMessageBuilder<?>) {
private Object obtainReplyChannel(MessageHeaders requestHeaders, Object reply) {
Object replyChannel = requestHeaders.getReplyChannel();
if (replyChannel == null) {
if (reply instanceof Message) {
replyChannel = ((Message<?>) reply).getHeaders().getReplyChannel();
if (reply instanceof Message<?> replyMessage) {
replyChannel = replyMessage.getHeaders().getReplyChannel();
}
else if (reply instanceof AbstractIntegrationMessageBuilder<?>) {
replyChannel = ((AbstractIntegrationMessageBuilder<?>) reply)
.getHeader(MessageHeaders.REPLY_CHANNEL, Object.class);
else if (reply instanceof AbstractIntegrationMessageBuilder<?> messageBuilder) {
replyChannel = messageBuilder.getHeader(MessageHeaders.REPLY_CHANNEL, Object.class);
}
}
return replyChannel;
Expand Down Expand Up @@ -438,15 +434,15 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) {
return builder;
}

private Object getOutputChannelFromRoutingSlip(Object reply, Message<?> requestMessage, List<?> routingSlip,
private @Nullable Object getOutputChannelFromRoutingSlip(Object reply, Message<?> requestMessage, List<?> routingSlip,
AtomicInteger routingSlipIndex) {

if (routingSlipIndex.get() >= routingSlip.size()) {
return null;
}

Object path = routingSlip.get(routingSlipIndex.get());
Object routingSlipPathValue = null;
Object routingSlipPathValue;

if (path instanceof String string) {
routingSlipPathValue = getBeanFactory().getBean(string);
Expand Down Expand Up @@ -558,7 +554,7 @@ protected boolean shouldCopyRequestHeaders() {
protected void sendErrorMessage(Message<?> requestMessage, Throwable ex) {
Object errorChannel = resolveErrorChannel(requestMessage.getHeaders());
Throwable result = ex;
if (!(ex instanceof MessagingException)) {
if (!(result instanceof MessagingException)) {
result = new MessageHandlingException(requestMessage, ex);
}
if (errorChannel == null) {
Expand All @@ -578,7 +574,7 @@ protected void sendErrorMessage(Message<?> requestMessage, Throwable ex) {
}
}

protected Object resolveErrorChannel(final MessageHeaders requestHeaders) {
protected @Nullable Object resolveErrorChannel(final MessageHeaders requestHeaders) {
Object errorChannel = requestHeaders.getErrorChannel();
if (errorChannel == null) {
try {
Expand All @@ -594,12 +590,10 @@ protected Object resolveErrorChannel(final MessageHeaders requestHeaders) {
protected void setupMessageProcessor(MessageProcessor<?> processor) {
if (processor instanceof AbstractMessageProcessor<?> abstractMessageProcessor) {
ConversionService conversionService = getConversionService();
if (conversionService != null) {
abstractMessageProcessor.setConversionService(conversionService);
}
abstractMessageProcessor.setConversionService(conversionService);
}
BeanFactory beanFactory = getBeanFactory();
if (processor instanceof BeanFactoryAware beanFactoryAware && beanFactory != null) {
if (processor instanceof BeanFactoryAware beanFactoryAware) {
beanFactoryAware.setBeanFactory(beanFactory);
}
if (processor instanceof InitializingBean initializingBean) {
Expand Down Expand Up @@ -628,7 +622,7 @@ private final class ReplyFutureCallback implements BiConsumer<Object, Throwable>
}

@Override
public void accept(Object result, Throwable exception) {
public void accept(@Nullable Object result, @Nullable Throwable exception) {
if (result != null) {
Message<?> replyMessage = null;
try {
Expand All @@ -637,7 +631,7 @@ public void accept(Object result, Throwable exception) {
}
catch (Exception ex) {
Exception exceptionToLogAndSend = ex;
if (!(ex instanceof MessagingException)) { // NOSONAR
if (!(ex instanceof MessagingException)) {
exceptionToLogAndSend = new MessageHandlingException(this.requestMessage, ex);
if (replyMessage != null) {
exceptionToLogAndSend = new MessagingException(replyMessage, exceptionToLogAndSend);
Expand All @@ -648,7 +642,8 @@ public void accept(Object result, Throwable exception) {
}
}
else if (exception != null) {
onFailure(exception instanceof CompletionException ? exception.getCause() : exception);
onFailure(exception instanceof CompletionException && exception.getCause() != null
? exception.getCause() : exception);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.springframework.integration.handler.advice.HandleMessageAdvice;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/**
* Base class for MessageHandlers that are capable of producing replies.
Expand All @@ -53,11 +52,12 @@ public abstract class AbstractReplyProducingMessageHandler extends AbstractMessa

private final List<Advice> adviceChain = new LinkedList<>();

private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
@SuppressWarnings("NullAway.Init")
private ClassLoader beanClassLoader;

private boolean requiresReply = false;

private volatile RequestHandler advisedRequestHandler;
private volatile @Nullable RequestHandler advisedRequestHandler;

/**
* Flag whether a reply is required. If true an incoming message MUST result in a reply message being sent.
Expand Down Expand Up @@ -160,8 +160,8 @@ else if (!isAsync() && isLoggingEnabled()) {
}
}

@Nullable
protected Object doInvokeAdvisedRequestHandler(Message<?> message) {
@SuppressWarnings("NullAway") // dataflow analysis limitation
protected @Nullable Object doInvokeAdvisedRequestHandler(Message<?> message) {
return this.advisedRequestHandler.handleRequestMessage(message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ public class BeanNameMessageProcessor<T> implements MessageProcessor<T>, BeanFac

private final String beanName;

private final String methodName;
private final @Nullable String methodName;

@SuppressWarnings("NullAway.Init")
private MessageProcessor<T> delegate;

@SuppressWarnings("NullAway.Init")
private BeanFactory beanFactory;

public BeanNameMessageProcessor(String object, String methodName) {
public BeanNameMessageProcessor(String object, @Nullable String methodName) {
this.beanName = object;
this.methodName = methodName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.List;

import org.jspecify.annotations.Nullable;

import org.springframework.expression.Expression;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.IntegrationPattern;
Expand All @@ -41,6 +43,7 @@
public class ControlBusMessageProcessor extends AbstractMessageProcessor<Object>
implements IntegrationPattern {

@SuppressWarnings("NullAway.Init")
private ControlBusCommandRegistry controlBusCommandRegistry;

public ControlBusMessageProcessor() {
Expand Down Expand Up @@ -69,7 +72,7 @@ protected void onInit() {
}

@Override
public Object processMessage(Message<?> message) {
public @Nullable Object processMessage(Message<?> message) {
String command = message.getPayload().toString();
@SuppressWarnings("unchecked")
List<Object> arguments =
Expand Down
Loading
Loading