62
62
import org .springframework .transaction .support .TransactionSynchronization ;
63
63
import org .springframework .transaction .support .TransactionSynchronizationManager ;
64
64
import org .springframework .util .Assert ;
65
- import org .springframework .util .ClassUtils ;
66
65
import org .springframework .util .CollectionUtils ;
67
66
import org .springframework .util .ErrorHandler ;
68
67
import org .springframework .util .ReflectionUtils ;
@@ -98,27 +97,29 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint implement
98
97
99
98
private boolean syncExecutor = true ;
100
99
101
- private ClassLoader beanClassLoader = ClassUtils .getDefaultClassLoader ();
100
+ @ SuppressWarnings ("NullAway.Init" )
101
+ private ClassLoader beanClassLoader ;
102
102
103
103
private Trigger trigger = new PeriodicTrigger (Duration .ofMillis (DEFAULT_POLLING_PERIOD ));
104
104
105
- private ErrorHandler errorHandler ;
105
+ private @ Nullable ErrorHandler errorHandler ;
106
106
107
107
private boolean errorHandlerIsDefault ;
108
108
109
- private List <Advice > adviceChain ;
109
+ private @ Nullable List <Advice > adviceChain ;
110
110
111
- private TransactionSynchronizationFactory transactionSynchronizationFactory ;
111
+ private @ Nullable TransactionSynchronizationFactory transactionSynchronizationFactory ;
112
112
113
113
private volatile long maxMessagesPerPoll = -1 ;
114
114
115
- private volatile Callable <Message <?>> pollingTask ;
115
+ @ SuppressWarnings ("NullAway.Init" )
116
+ private volatile Callable <@ Nullable Message <?>> pollingTask ;
116
117
117
- private volatile Flux <Message <?>> pollingFlux ;
118
+ private volatile @ Nullable Flux <Message <?>> pollingFlux ;
118
119
119
- private volatile Subscription subscription ;
120
+ private volatile @ Nullable Subscription subscription ;
120
121
121
- private volatile ScheduledFuture <?> runningTask ;
122
+ private volatile @ Nullable ScheduledFuture <?> runningTask ;
122
123
123
124
private volatile boolean initialized ;
124
125
@@ -127,11 +128,11 @@ public AbstractPollingEndpoint() {
127
128
this .setPhase (Integer .MAX_VALUE / 2 );
128
129
}
129
130
130
- public void setTaskExecutor (Executor taskExecutor ) {
131
+ public void setTaskExecutor (@ Nullable Executor taskExecutor ) {
131
132
this .taskExecutor = (taskExecutor != null ? taskExecutor : new SyncTaskExecutor ());
132
133
this .syncExecutor = this .taskExecutor instanceof SyncTaskExecutor
133
- || (this .taskExecutor instanceof ErrorHandlingTaskExecutor
134
- && (( ErrorHandlingTaskExecutor ) this . taskExecutor ) .isSyncExecutor ());
134
+ || (this .taskExecutor instanceof ErrorHandlingTaskExecutor errorHandlingTaskExecutor
135
+ && errorHandlingTaskExecutor .isSyncExecutor ());
135
136
}
136
137
137
138
protected Executor getTaskExecutor () {
@@ -142,11 +143,11 @@ protected boolean isSyncExecutor() {
142
143
return this .syncExecutor ;
143
144
}
144
145
145
- public void setTrigger (Trigger trigger ) {
146
+ public void setTrigger (@ Nullable Trigger trigger ) {
146
147
this .trigger = (trigger != null ? trigger : new PeriodicTrigger (Duration .ofMillis (DEFAULT_POLLING_PERIOD )));
147
148
}
148
149
149
- public void setAdviceChain (List <Advice > adviceChain ) {
150
+ public void setAdviceChain (@ Nullable List <Advice > adviceChain ) {
150
151
this .adviceChain = adviceChain ;
151
152
}
152
153
@@ -167,7 +168,7 @@ public long getMaxMessagesPerPoll() {
167
168
return this .maxMessagesPerPoll ;
168
169
}
169
170
170
- public void setErrorHandler (ErrorHandler errorHandler ) {
171
+ public void setErrorHandler (@ Nullable ErrorHandler errorHandler ) {
171
172
this .errorHandler = errorHandler ;
172
173
}
173
174
@@ -177,7 +178,7 @@ public void setBeanClassLoader(ClassLoader classLoader) {
177
178
}
178
179
179
180
public void setTransactionSynchronizationFactory (
180
- TransactionSynchronizationFactory transactionSynchronizationFactory ) {
181
+ @ Nullable TransactionSynchronizationFactory transactionSynchronizationFactory ) {
181
182
182
183
this .transactionSynchronizationFactory = transactionSynchronizationFactory ;
183
184
}
@@ -188,7 +189,7 @@ public void setTransactionSynchronizationFactory(
188
189
* @return the channel or null.
189
190
* @since 4.3
190
191
*/
191
- public MessageChannel getDefaultErrorChannel () {
192
+ public @ Nullable MessageChannel getDefaultErrorChannel () {
192
193
if (!this .errorHandlerIsDefault && this .errorHandler
193
194
instanceof MessagePublishingErrorHandler messagePublishingErrorHandler ) {
194
195
@@ -255,11 +256,11 @@ protected boolean isReactive() {
255
256
return false ;
256
257
}
257
258
258
- protected Flux <Message <?>> getPollingFlux () {
259
+ protected @ Nullable Flux <Message <?>> getPollingFlux () {
259
260
return this .pollingFlux ;
260
261
}
261
262
262
- protected Object getReceiveMessageSource () {
263
+ protected @ Nullable Object getReceiveMessageSource () {
263
264
return null ;
264
265
}
265
266
@@ -275,7 +276,7 @@ protected void onInit() {
275
276
return ;
276
277
}
277
278
Assert .notNull (this .trigger , "Trigger is required" );
278
- if (this . taskExecutor != null && !(this .taskExecutor instanceof ErrorHandlingTaskExecutor )) {
279
+ if (!(this .taskExecutor instanceof ErrorHandlingTaskExecutor )) {
279
280
if (this .errorHandler == null ) {
280
281
this .errorHandler = ChannelUtils .getErrorHandler (getBeanFactory ());
281
282
this .errorHandlerIsDefault = true ;
@@ -314,21 +315,20 @@ protected void doStart() {
314
315
}
315
316
else {
316
317
TaskScheduler taskScheduler = getTaskScheduler ();
317
- Assert .state (taskScheduler != null , "unable to start polling, no taskScheduler available" );
318
318
this .runningTask = taskScheduler .schedule (createPoller (), this .trigger );
319
319
}
320
320
}
321
321
322
322
@ SuppressWarnings ("unchecked" )
323
- private Callable <Message <?>> createPollingTask () {
323
+ private Callable <@ Nullable Message <?>> createPollingTask () {
324
324
List <Advice > receiveOnlyAdviceChain = null ;
325
325
if (!CollectionUtils .isEmpty (this .adviceChain )) {
326
326
receiveOnlyAdviceChain = this .adviceChain .stream ()
327
327
.filter (this ::isReceiveOnlyAdvice )
328
328
.toList ();
329
329
}
330
330
331
- Callable <Message <?>> task = this ::doPoll ;
331
+ Callable <@ Nullable Message <?>> task = this ::doPoll ;
332
332
333
333
List <Advice > advices = this .adviceChain ;
334
334
if (!CollectionUtils .isEmpty (advices )) {
@@ -338,7 +338,7 @@ private Callable<Message<?>> createPollingTask() {
338
338
.filter (advice -> !isReceiveOnlyAdvice (advice ))
339
339
.forEach (proxyFactory ::addAdvice );
340
340
}
341
- task = (Callable <Message <?>>) proxyFactory .getProxy (this .beanClassLoader );
341
+ task = (Callable <@ Nullable Message <?>>) proxyFactory .getProxy (this .beanClassLoader );
342
342
}
343
343
if (!CollectionUtils .isEmpty (receiveOnlyAdviceChain )) {
344
344
applyReceiveOnlyAdviceChain (receiveOnlyAdviceChain );
@@ -418,39 +418,44 @@ private Flux<Message<?>> createFluxGenerator() {
418
418
.doOnSubscribe (subs -> this .subscription = subs );
419
419
}
420
420
421
- private Message <?> pollForMessage () {
421
+ private @ Nullable Message <?> pollForMessage () {
422
422
Exception pollingTaskError = null ;
423
423
try {
424
424
return this .pollingTask .call ();
425
425
}
426
426
catch (Exception ex ) {
427
427
pollingTaskError = ex ;
428
- if (ex instanceof MessagingException messagingException ) { // NOSONAR
428
+ if (ex instanceof MessagingException messagingException ) {
429
429
throw messagingException ;
430
430
}
431
431
else {
432
432
Message <?> failedMessage = null ;
433
433
if (this .transactionSynchronizationFactory != null ) {
434
- Object resource = TransactionSynchronizationManager .getResource (getResourceToBind ());
434
+ Object resource = null ;
435
+ Object resourceToBind = getResourceToBind ();
436
+ if (resourceToBind != null ) {
437
+ resource = TransactionSynchronizationManager .getResource (resourceToBind );
438
+ }
435
439
if (resource instanceof IntegrationResourceHolder integrationResourceHolder ) {
436
440
failedMessage = integrationResourceHolder .getMessage ();
437
441
}
438
442
}
439
- throw new MessagingException (failedMessage , ex ); // NOSONAR (null failedMessage)
443
+ throw failedMessage == null ? new MessagingException ((String ) null , ex )
444
+ : new MessagingException (failedMessage , ex );
440
445
}
441
446
}
442
447
finally {
443
448
if (this .transactionSynchronizationFactory != null ) {
444
449
Object resource = getResourceToBind ();
445
- if (TransactionSynchronizationManager .hasResource (resource )) {
450
+ if (resource != null && TransactionSynchronizationManager .hasResource (resource )) {
446
451
TransactionSynchronizationManager .unbindResource (resource );
447
452
}
448
453
}
449
454
donePollingTask (pollingTaskError );
450
455
}
451
456
}
452
457
453
- private Message <?> doPoll () {
458
+ private @ Nullable Message <?> doPoll () {
454
459
IntegrationResourceHolder holder = bindResourceHolderIfNecessary (getResourceKey (), getResourceToBind ());
455
460
Message <?> message = null ;
456
461
try {
@@ -517,7 +522,7 @@ protected void doStop() {
517
522
* if no message is immediately available.
518
523
* @return The message or null.
519
524
*/
520
- protected abstract Message <?> receiveMessage ();
525
+ protected abstract @ Nullable Message <?> receiveMessage ();
521
526
522
527
/**
523
528
* Handle a message.
@@ -530,7 +535,7 @@ protected void doStop() {
530
535
* synchronization.
531
536
* @return The resource, or null if transaction synchronization is not required.
532
537
*/
533
- protected Object getResourceToBind () {
538
+ protected @ Nullable Object getResourceToBind () {
534
539
return null ;
535
540
}
536
541
@@ -540,14 +545,14 @@ protected Object getResourceToBind() {
540
545
* {@link org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor}
541
546
* makes this attribute available as a variable in SpEL expressions.
542
547
* @return The key, or null (default) if the resource shouldn't be
543
- * made available as a attribute.
548
+ * made available as an attribute.
544
549
*/
545
- protected String getResourceKey () {
550
+ protected @ Nullable String getResourceKey () {
546
551
return null ;
547
552
}
548
553
549
554
@ Nullable
550
- private IntegrationResourceHolder bindResourceHolderIfNecessary (String key , Object resource ) {
555
+ private IntegrationResourceHolder bindResourceHolderIfNecessary (@ Nullable String key , @ Nullable Object resource ) {
551
556
if (this .transactionSynchronizationFactory != null && resource != null &&
552
557
TransactionSynchronizationManager .isActualTransactionActive ()) {
553
558
0 commit comments