Skip to content

Commit 31a7dd4

Browse files
committed
Further Nullability cleanup of the core handler package
Related to: #10083 * Fix `LockRegistry` to have a generic argument of the `executeLocked()` as potentially nullable * Fix `ParameterExpressionEvaluator`, according to the contract of super class * Add tentative `NullAway` for `KafkaInboundGateway` & `KafkaMessageDrivenChannelAdapter` until migration from Spring Retry is done * Fix `MongoDbMessageSourceTests` & `ReactiveMongoDbMessageSourceTests` for required now `BeanFactory` * Rework the `AnnotatedTests` logic to verify in the proper place
1 parent a167170 commit 31a7dd4

File tree

38 files changed

+277
-288
lines changed

38 files changed

+277
-288
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private MessageChannel resolveErrorChannel(Throwable t) {
129129
actualThrowable instanceof MessagingException
130130
? ((MessagingException) actualThrowable).getFailedMessage()
131131
: null;
132-
if (getDefaultErrorChannel() == null && channelResolver != null) {
132+
if (getDefaultErrorChannel() == null) {
133133
setChannel(channelResolver.resolveDestination(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME));
134134
}
135135

@@ -141,7 +141,7 @@ private MessageChannel resolveErrorChannel(Throwable t) {
141141
Assert.isInstanceOf(String.class, errorChannelHeader, () ->
142142
"Unsupported error channel header type. Expected MessageChannel or String, but actual type is [" +
143143
errorChannelHeader.getClass() + "]");
144-
if (channelResolver != null && StringUtils.hasText((String) errorChannelHeader)) {
144+
if (StringUtils.hasText((String) errorChannelHeader)) {
145145
return channelResolver.resolveDestination((String) errorChannelHeader);
146146
}
147147
}

spring-integration-core/src/main/java/org/springframework/integration/config/xml/DefaultOutboundChannelAdapterParser.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ else if (scriptElement != null) {
9898
consumerBuilder.addConstructorArgValue(hasMethod ? methodName : "handleMessage");
9999
}
100100

101-
consumerBuilder.addPropertyValue("componentType", "outbound-channel-adapter");
102101
return consumerBuilder.getBeanDefinition();
103102
}
104103

spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,16 @@
5050
*/
5151
public class ErrorMessagePublisher implements BeanFactoryAware {
5252

53-
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final
53+
protected final Log logger = LogFactory.getLog(getClass());
5454

55-
protected final MessagingTemplate messagingTemplate = new MessagingTemplate(); // NOSONAR final
55+
protected final MessagingTemplate messagingTemplate = new MessagingTemplate();
5656

5757
@SuppressWarnings("NullAway.Init")
5858
private DestinationResolver<MessageChannel> channelResolver;
5959

60-
@Nullable
61-
private MessageChannel channel;
60+
private @Nullable MessageChannel channel;
6261

63-
@Nullable
64-
private String channelName;
62+
private @Nullable String channelName;
6563

6664
private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
6765

@@ -70,7 +68,7 @@ public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrat
7068
this.errorMessageStrategy = errorMessageStrategy;
7169
}
7270

73-
public final void setChannel(@Nullable MessageChannel channel) {
71+
public final void setChannel(MessageChannel channel) {
7472
this.channel = channel;
7573
}
7674

@@ -82,8 +80,7 @@ public ErrorMessageStrategy getErrorMessageStrategy() {
8280
return this.errorMessageStrategy;
8381
}
8482

85-
@Nullable
86-
public MessageChannel getChannel() {
83+
public @Nullable MessageChannel getChannel() {
8784
populateChannel();
8885
return this.channel;
8986
}

spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProcessorMessageSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
*/
3636
public class MessageProcessorMessageSource extends AbstractMessageSource<Object> {
3737

38-
// provide a fake message since the contract of processMessage requires a NonNull Message.
38+
/**
39+
* A fake message since the {@link MessageProcessor#processMessage(Message)} requires a non-null.
40+
*/
3941
static final Message<Object> FAKE_MESSAGE = MutableMessageBuilder.withPayload(new Object(), false).build();
4042

4143
private final MessageProcessor<?> messageProcessor;

spring-integration-core/src/main/java/org/springframework/integration/filter/MessageFilter.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
* Message Handler that delegates to a {@link MessageSelector}. If and only if
3838
* the selector {@link MessageSelector#accept(Message) accepts} the Message, it
3939
* will be passed to this filter's output channel. Otherwise, the message will
40-
* either be silently dropped (the default) with a warning into logs,
40+
* either be silently dropped (the default) with a warning into logs
4141
* or will trigger the throwing of a {@link MessageRejectedException}
4242
* depending on the value of its {@link #throwExceptionOnRejection} property.
4343
* If a discard channel is provided, the rejected Messages will be sent to that channel.
@@ -55,9 +55,9 @@ public class MessageFilter extends AbstractReplyProducingPostProcessingMessageHa
5555

5656
private boolean throwExceptionOnRejection;
5757

58-
private @Nullable MessageChannel discardChannel;
58+
private @Nullable MessageChannel discardChannel;
5959

60-
private @Nullable String discardChannelName;
60+
private @Nullable String discardChannelName;
6161

6262
/**
6363
* Create a MessageFilter that will delegate to the given {@link MessageSelector}.
@@ -113,7 +113,7 @@ public void setDiscardWithinAdvice(boolean discardWithinAdvice) {
113113
}
114114

115115
@Override
116-
public @Nullable MessageChannel getDiscardChannel() {
116+
public @Nullable MessageChannel getDiscardChannel() {
117117
String channelName = this.discardChannelName;
118118
if (channelName != null) {
119119
this.discardChannel = getChannelResolver().resolveDestination(channelName);
@@ -148,21 +148,21 @@ protected void doInit() {
148148

149149
@Override
150150
public void start() {
151-
if (this.selector instanceof Lifecycle) {
152-
((Lifecycle) this.selector).start();
151+
if (this.selector instanceof Lifecycle lifecycle) {
152+
lifecycle.start();
153153
}
154154
}
155155

156156
@Override
157157
public void stop() {
158-
if (this.selector instanceof Lifecycle) {
159-
((Lifecycle) this.selector).stop();
158+
if (this.selector instanceof Lifecycle lifecycle) {
159+
lifecycle.stop();
160160
}
161161
}
162162

163163
@Override
164164
public boolean isRunning() {
165-
return !(this.selector instanceof Lifecycle) || ((Lifecycle) this.selector).isRunning();
165+
return !(this.selector instanceof Lifecycle lifecycle) || lifecycle.isRunning();
166166
}
167167

168168
@Override

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.springframework.integration.support.utils.IntegrationUtils;
3232
import org.springframework.messaging.Message;
3333
import org.springframework.messaging.MessageHandler;
34-
import org.springframework.util.Assert;
3534

3635
/**
3736
* Base class for {@link MessageHandler} implementations.
@@ -42,8 +41,7 @@
4241
public abstract class AbstractMessageHandler extends MessageHandlerSupport
4342
implements MessageHandler, CoreSubscriber<Message<?>> {
4443

45-
@Nullable
46-
private MessageReceiverObservationConvention observationConvention;
44+
private @Nullable MessageReceiverObservationConvention observationConvention;
4745

4846
/**
4947
* Set a custom {@link MessageReceiverObservationConvention} for {@link IntegrationObservation#HANDLER}.
@@ -57,7 +55,6 @@ public void setObservationConvention(@Nullable MessageReceiverObservationConvent
5755

5856
@Override
5957
public void handleMessage(Message<?> message) {
60-
Assert.notNull(message, "Message must not be null");
6158
if (isLoggingEnabled()) {
6259
this.logger.debug(() -> this + " received message: " + message);
6360
}
@@ -112,7 +109,6 @@ private void doHandleMessage(Message<?> message) {
112109

113110
@Override
114111
public void onSubscribe(Subscription subscription) {
115-
Assert.notNull(subscription, "'subscription' must not be null");
116112
subscription.request(Long.MAX_VALUE);
117113
}
118114

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,9 @@ public abstract class AbstractMessageProducingHandler extends AbstractMessageHan
8282

8383
private boolean asyncExplicitlySet;
8484

85-
@Nullable
86-
private String outputChannelName;
85+
private @Nullable String outputChannelName;
8786

88-
@Nullable
89-
private MessageChannel outputChannel;
87+
private @Nullable MessageChannel outputChannel;
9088

9189
private String @Nullable [] notPropagatedHeaders;
9290

@@ -142,7 +140,7 @@ protected boolean isAsync() {
142140
* that will NOT be copied from the inbound message if
143141
* {@link #shouldCopyRequestHeaders() shouldCopyRequestHeaaders} is true.
144142
* At least one pattern as "*" means do not copy headers at all.
145-
* @param headers the headers to not propagate from the inbound message.
143+
* @param headers the headers do not propagate from the inbound message.
146144
* @since 4.3.10
147145
* @see org.springframework.util.PatternMatchUtils
148146
*/
@@ -201,7 +199,7 @@ public Collection<String> getNotPropagatedHeaders() {
201199
* Add header patterns ("xxx*", "*xxx", "*xxx*" or "xxx*yyy")
202200
* that will NOT be copied from the inbound message if
203201
* {@link #shouldCopyRequestHeaders()} is true, instead of overwriting the existing set.
204-
* @param headers the headers to not propagate from the inbound message.
202+
* @param headers the headers do not propagate from the inbound message.
205203
* @since 4.3.10
206204
* @see #setNotPropagatedHeaders(String...)
207205
*/
@@ -262,7 +260,7 @@ protected boolean shouldSplitOutput(Iterable<?> reply) {
262260
return false;
263261
}
264262

265-
protected void produceOutput(Object replyArg, final Message<?> requestMessage) {
263+
protected void produceOutput(Object replyArg, Message<?> requestMessage) {
266264
MessageHeaders requestHeaders = requestMessage.getHeaders();
267265
Object reply = replyArg;
268266
Object replyChannel = null;
@@ -288,8 +286,7 @@ protected void produceOutput(Object replyArg, final Message<?> requestMessage) {
288286
doProduceOutput(requestMessage, requestHeaders, reply, replyChannel);
289287
}
290288

291-
@Nullable
292-
private Map<?, ?> obtainRoutingSlipHeader(MessageHeaders requestHeaders, Object reply) {
289+
private @Nullable Map<?, ?> obtainRoutingSlipHeader(MessageHeaders requestHeaders, Object reply) {
293290
Map<?, ?> routingSlipHeader = requestHeaders.get(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
294291
if (routingSlipHeader == null) {
295292
if (reply instanceof Message<?> replyMessage) {
@@ -303,8 +300,7 @@ else if (reply instanceof AbstractIntegrationMessageBuilder<?> messageBuilder) {
303300
return routingSlipHeader;
304301
}
305302

306-
@Nullable
307-
private Object obtainReplyChannel(MessageHeaders requestHeaders, Object reply) {
303+
private @Nullable Object obtainReplyChannel(MessageHeaders requestHeaders, Object reply) {
308304
Object replyChannel = requestHeaders.getReplyChannel();
309305
if (replyChannel == null) {
310306
if (reply instanceof Message<?> replyMessage) {
@@ -422,20 +418,20 @@ private AbstractIntegrationMessageBuilder<?> addRoutingSlipHeader(Object reply,
422418

423419
protected AbstractIntegrationMessageBuilder<?> messageBuilderForReply(Object reply) {
424420
AbstractIntegrationMessageBuilder<?> builder;
425-
if (reply instanceof Message) {
426-
builder = getMessageBuilderFactory().fromMessage((Message<?>) reply);
421+
if (reply instanceof Message<?> message) {
422+
builder = getMessageBuilderFactory().fromMessage(message);
427423
}
428-
else if (reply instanceof AbstractIntegrationMessageBuilder) {
429-
builder = (AbstractIntegrationMessageBuilder<?>) reply;
424+
else if (reply instanceof AbstractIntegrationMessageBuilder<?> messageBuilder) {
425+
builder = messageBuilder;
430426
}
431427
else {
432428
builder = getMessageBuilderFactory().withPayload(reply);
433429
}
434430
return builder;
435431
}
436432

437-
private @Nullable Object getOutputChannelFromRoutingSlip(Object reply, Message<?> requestMessage, List<?> routingSlip,
438-
AtomicInteger routingSlipIndex) {
433+
private @Nullable Object getOutputChannelFromRoutingSlip(Object reply, Message<?> requestMessage,
434+
List<?> routingSlip, AtomicInteger routingSlipIndex) {
439435

440436
if (routingSlipIndex.get() >= routingSlip.size()) {
441437
return null;
@@ -473,18 +469,18 @@ else if (path instanceof RoutingSlipRouteStrategy) {
473469

474470
protected Message<?> createOutputMessage(Object output, MessageHeaders requestHeaders) {
475471
AbstractIntegrationMessageBuilder<?> builder;
476-
if (output instanceof Message<?>) {
472+
if (output instanceof Message<?> message) {
477473
if (this.noHeadersPropagation || !shouldCopyRequestHeaders()) {
478-
return (Message<?>) output;
474+
return message;
479475
}
480-
builder = getMessageBuilderFactory().fromMessage((Message<?>) output);
476+
builder = getMessageBuilderFactory().fromMessage(message);
481477
}
482-
else if (output instanceof AbstractIntegrationMessageBuilder) {
483-
builder = (AbstractIntegrationMessageBuilder<?>) output;
478+
else if (output instanceof AbstractIntegrationMessageBuilder<?> messageBuilder) {
479+
builder = messageBuilder;
484480
}
485481
else {
486482
builder = getMessageBuilderFactory().withPayload(output);
487-
// Assuming that message in the payload collection is a copy of request message.
483+
// Assuming that a message in the payload collection is a copy of the request message.
488484
if (output instanceof Iterable<?> iterable) {
489485
Iterator<?> iterator = iterable.iterator();
490486
if (iterator.hasNext() && iterator.next() instanceof Message<?>) {
@@ -523,16 +519,16 @@ protected void sendOutput(Object output, @Nullable Object replyChannelArg, boole
523519
}
524520

525521
if (replyChannel instanceof MessageChannel messageChannel) {
526-
if (output instanceof Message<?>) {
527-
this.messagingTemplate.send(messageChannel, (Message<?>) output);
522+
if (output instanceof Message<?> message) {
523+
this.messagingTemplate.send(messageChannel, message);
528524
}
529525
else {
530526
this.messagingTemplate.convertAndSend(messageChannel, output);
531527
}
532528
}
533529
else if (replyChannel instanceof String string) {
534-
if (output instanceof Message<?>) {
535-
this.messagingTemplate.send(string, (Message<?>) output);
530+
if (output instanceof Message<?> message) {
531+
this.messagingTemplate.send(string, message);
536532
}
537533
else {
538534
this.messagingTemplate.convertAndSend(string, output);
@@ -613,8 +609,7 @@ private final class ReplyFutureCallback implements BiConsumer<Object, Throwable>
613609

614610
private final Message<?> requestMessage;
615611

616-
@Nullable
617-
private final Object replyChannel;
612+
private final @Nullable Object replyChannel;
618613

619614
ReplyFutureCallback(Message<?> requestMessage, @Nullable Object replyChannel) {
620615
this.requestMessage = requestMessage;
@@ -642,8 +637,14 @@ public void accept(@Nullable Object result, @Nullable Throwable exception) {
642637
}
643638
}
644639
else if (exception != null) {
645-
onFailure(exception instanceof CompletionException && exception.getCause() != null
646-
? exception.getCause() : exception);
640+
Throwable throwableToHandle = exception;
641+
if (exception instanceof CompletionException) {
642+
Throwable cause = exception.getCause();
643+
if (cause != null) {
644+
throwableToHandle = cause;
645+
}
646+
}
647+
onFailure(throwableToHandle);
647648
}
648649
}
649650

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ public abstract class AbstractReplyProducingMessageHandler extends AbstractMessa
6060
private volatile @Nullable RequestHandler advisedRequestHandler;
6161

6262
/**
63-
* Flag whether a reply is required. If true an incoming message MUST result in a reply message being sent.
64-
* If false an incoming message MAY result in a reply message being sent. Default is false.
63+
* Flag whether a reply is required. If {@code true} an incoming message MUST result in a reply message being sent.
64+
* If {@code false} an incoming message may result in a reply message being sent. Default is false.
6565
* @param requiresReply true if a reply is required.
6666
*/
6767
public void setRequiresReply(boolean requiresReply) {
@@ -107,9 +107,9 @@ protected ClassLoader getBeanClassLoader() {
107107
@Override
108108
public IntegrationPatternType getIntegrationPatternType() {
109109
// Most out-of-the-box Spring Integration implementations provide an outbound gateway
110-
// for particular external protocol. If an implementation doesn't belong to this category,
110+
// for a particular external protocol. If an implementation doesn't belong to this category,
111111
// it overrides this method to provide its own specific integration pattern type:
112-
// service-activator, splitter, aggregator, router etc.
112+
// service-activator, splitter, aggregator, router, etc.
113113
return IntegrationPatternType.outbound_gateway;
114114
}
115115

@@ -211,8 +211,7 @@ private class AdvisedRequestHandler implements RequestHandler {
211211
}
212212

213213
@Override
214-
@Nullable
215-
public Object handleRequestMessage(Message<?> requestMessage) {
214+
public @Nullable Object handleRequestMessage(Message<?> requestMessage) {
216215
return AbstractReplyProducingMessageHandler.this.handleRequestMessage(requestMessage);
217216
}
218217

spring-integration-core/src/main/java/org/springframework/integration/handler/ControlBusMessageProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,10 @@ protected void onInit() {
7979
message.getHeaders().get(IntegrationMessageHeaderAccessor.CONTROL_BUS_ARGUMENTS, List.class);
8080
Class<?>[] parameterTypes = {};
8181
if (!CollectionUtils.isEmpty(arguments)) {
82-
parameterTypes =
83-
arguments.stream()
84-
.map(Object::getClass)
85-
.toArray(Class<?>[]::new);
82+
parameterTypes = new Class<?>[arguments.size()];
83+
for (int i = 0; i < arguments.size(); i++) {
84+
parameterTypes[i] = arguments.get(i).getClass();
85+
}
8686
}
8787
Expression commandExpression = this.controlBusCommandRegistry.getExpressionForCommand(command, parameterTypes);
8888
return evaluateExpression(commandExpression, arguments);

0 commit comments

Comments
 (0)