Skip to content

Commit d60fc95

Browse files
committed
SourcePollingChannelAdapterErrorObservationTests: race condition
The `try-with-resource` on the `ApplicationContext` may stop this context before the message is handled as expected in the flow logic. The timing race condition is due to a scheduling nature of the `SourcePollingChannelAdapter` Fix the `SourcePollingChannelAdapterErrorObservationTests` with a `CountDownLatch` to wait in the `try-with-resource` block until it is fulfilled in the `afterSendCompletion()` of the `ChannelInterceptor` on the `errorChannel` **Auto-cherry-pick to `6.5.x`**
1 parent 8243c7f commit d60fc95

File tree

1 file changed

+21
-1
lines changed

1 file changed

+21
-1
lines changed

spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/SourcePollingChannelAdapterErrorObservationTests.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package org.springframework.integration.support.management.observation;
1818

19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.TimeUnit;
21+
1922
import io.micrometer.observation.ObservationRegistry;
2023
import io.micrometer.tracing.Span;
2124
import io.micrometer.tracing.test.SampleTestRunner;
@@ -34,6 +37,9 @@
3437
import org.springframework.integration.config.EnableIntegrationManagement;
3538
import org.springframework.integration.scheduling.PollerMetadata;
3639
import org.springframework.integration.test.util.OnlyOnceTrigger;
40+
import org.springframework.messaging.Message;
41+
import org.springframework.messaging.MessageChannel;
42+
import org.springframework.messaging.support.ChannelInterceptor;
3743

3844
import static org.assertj.core.api.Assertions.assertThat;
3945
import static org.awaitility.Awaitility.await;
@@ -59,6 +65,8 @@ public SampleTestRunnerConsumer yourCode() {
5965
applicationContext.registerBean(ObservationRegistry.class, () -> observationRegistry);
6066
applicationContext.register(ObservationIntegrationTestConfiguration.class);
6167
applicationContext.refresh();
68+
var testConfiguration = applicationContext.getBean(ObservationIntegrationTestConfiguration.class);
69+
assertThat(testConfiguration.errorHandledLatch.await(10, TimeUnit.SECONDS)).isTrue();
6270
}
6371

6472
await().untilAsserted(() -> assertThat(bb.getFinishedSpans()).hasSize(5));
@@ -93,6 +101,8 @@ public SampleTestRunnerConsumer yourCode() {
93101
@EnableIntegrationManagement(observationPatterns = "*")
94102
static class ObservationIntegrationTestConfiguration {
95103

104+
CountDownLatch errorHandledLatch = new CountDownLatch(1);
105+
96106
@Bean
97107
PollerMetadata pollerMetadata() {
98108
PollerMetadata pollerMetadata = new PollerMetadata();
@@ -115,7 +125,17 @@ void processData(String data) {
115125
@Bean
116126
@BridgeTo("nullChannel")
117127
DirectChannel errorChannel() {
118-
return new DirectChannel();
128+
DirectChannel directChannel = new DirectChannel();
129+
directChannel.addInterceptor(new ChannelInterceptor() {
130+
131+
@Override
132+
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
133+
// Ensure that the error is handled before the application context is closed in the test above.
134+
ObservationIntegrationTestConfiguration.this.errorHandledLatch.countDown();
135+
}
136+
137+
});
138+
return directChannel;
119139
}
120140

121141
}

0 commit comments

Comments
 (0)