Skip to content

Commit 8243c7f

Browse files
committed
Internal ObservationRegistry in the ErrorHandlingTaskExecutor
There is no need to configure an external `ObservationRegistry` on the `ErrorHandlingTaskExecutor`. We only use it for closing currently in scope `Observation` after error handling on the task.
1 parent cfc9568 commit 8243c7f

File tree

3 files changed

+30
-52
lines changed

3 files changed

+30
-52
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.springframework.integration.support.management.observation.MessageReceiverContext;
4545
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
4646
import org.springframework.integration.transaction.IntegrationResourceHolder;
47-
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
4847
import org.springframework.messaging.Message;
4948
import org.springframework.messaging.MessageChannel;
5049
import org.springframework.messaging.MessagingException;
@@ -142,12 +141,6 @@ public void setShouldTrack(boolean shouldTrack) {
142141
@Override
143142
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
144143
this.observationRegistry = observationRegistry;
145-
if (isObserved()) {
146-
ErrorHandlingTaskExecutor taskExecutor = (ErrorHandlingTaskExecutor) getTaskExecutor();
147-
if (taskExecutor.getObservationRegistry() == null) {
148-
taskExecutor.setObservationRegistry(observationRegistry);
149-
}
150-
}
151144
}
152145

153146
/**
@@ -287,6 +280,7 @@ protected void messageReceived(@Nullable IntegrationResourceHolder holder, Messa
287280
/**
288281
* Stop an observation (and close its scope) previously started
289282
* from the {@link #messageReceived(IntegrationResourceHolder, Message)}.
283+
* @param message the received message. Can be {@code null}; ignored in this implementation.
290284
*/
291285
@Override
292286
protected void donePollingTask(@Nullable Message<?> message) {

spring-integration-core/src/main/java/org/springframework/integration/util/ErrorHandlingTaskExecutor.java

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import io.micrometer.observation.Observation;
2222
import io.micrometer.observation.ObservationRegistry;
23-
import org.jspecify.annotations.Nullable;
2423

2524
import org.springframework.core.task.SyncTaskExecutor;
2625
import org.springframework.core.task.TaskExecutor;
@@ -32,8 +31,7 @@
3231
* instance in order to catch any exceptions. If an exception is thrown, it
3332
* will be handled by the provided {@link ErrorHandler}.
3433
* <p>
35-
* If an {@link ObservationRegistry} is provided, the current observation in scope
36-
* will be closed after error handling.
34+
* After invoking {@link ErrorHandler}, the current observation scope will be closed.
3735
*
3836
* @author Jonas Partner
3937
* @author Mark Fisher
@@ -42,32 +40,19 @@
4240
*/
4341
public class ErrorHandlingTaskExecutor implements TaskExecutor {
4442

43+
private static final ObservationRegistry FOR_SCOPES = ObservationRegistry.create();
44+
4545
private final Executor executor;
4646

4747
private final ErrorHandler errorHandler;
4848

49-
private @Nullable ObservationRegistry observationRegistry;
50-
5149
public ErrorHandlingTaskExecutor(Executor executor, ErrorHandler errorHandler) {
5250
Assert.notNull(executor, "executor must not be null");
5351
Assert.notNull(errorHandler, "errorHandler must not be null");
5452
this.executor = executor;
5553
this.errorHandler = errorHandler;
5654
}
5755

58-
/**
59-
* Set an {@link ObservationRegistry} to close current observation in scope after error handling.
60-
* @param observationRegistry the {@link ObservationRegistry} to use.
61-
* @since 6.5.3
62-
*/
63-
public void setObservationRegistry(ObservationRegistry observationRegistry) {
64-
this.observationRegistry = observationRegistry;
65-
}
66-
67-
public @Nullable ObservationRegistry getObservationRegistry() {
68-
return this.observationRegistry;
69-
}
70-
7156
public boolean isSyncExecutor() {
7257
return this.executor instanceof SyncTaskExecutor;
7358
}
@@ -83,14 +68,12 @@ public void execute(final Runnable task) {
8368
ErrorHandlingTaskExecutor.this.errorHandler.handleError(throwable);
8469
}
8570
finally {
86-
if (this.observationRegistry != null) {
87-
var currentObservationScope = this.observationRegistry.getCurrentObservationScope();
88-
if (currentObservationScope != null) {
89-
currentObservationScope.close();
90-
Observation currentObservation = currentObservationScope.getCurrentObservation();
91-
currentObservation.error(throwable);
92-
currentObservation.stop();
93-
}
71+
var currentObservationScope = FOR_SCOPES.getCurrentObservationScope();
72+
if (currentObservationScope != null) {
73+
currentObservationScope.close();
74+
Observation currentObservation = currentObservationScope.getCurrentObservation();
75+
currentObservation.error(throwable);
76+
currentObservation.stop();
9477
}
9578
}
9679
}

spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/SourcePollingChannelAdapterErrorObservationTests.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -65,25 +65,26 @@ public SampleTestRunnerConsumer yourCode() {
6565

6666
SpansAssert.assertThat(bb.getFinishedSpans())
6767
.haveSameTraceId()
68-
.hasASpanWithName("dataMessageSource receive", spanAssert -> spanAssert
69-
.hasTag(IntegrationObservation.GatewayTags.COMPONENT_TYPE.asString(), "message-source")
70-
.hasKindEqualTo(Span.Kind.CONSUMER))
71-
.hasASpanWithName("inputChannel send", spanAssert -> spanAssert
72-
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_NAME.asString(), "inputChannel")
73-
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_TYPE.asString(), "producer")
74-
.hasKindEqualTo(Span.Kind.PRODUCER))
75-
.hasASpanWithName("dataHandler receive", spanAssert -> spanAssert
76-
.hasTag(IntegrationObservation.HandlerTags.COMPONENT_NAME.asString(), "dataHandler")
77-
.hasTag(IntegrationObservation.HandlerTags.COMPONENT_TYPE.asString(), "handler")
78-
.hasKindEqualTo(Span.Kind.CONSUMER))
79-
.hasASpanWithName("errorChannel send", spanAssert -> spanAssert
80-
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_NAME.asString(), "errorChannel")
81-
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_TYPE.asString(), "producer")
82-
.hasKindEqualTo(Span.Kind.PRODUCER))
83-
.hasASpanWithName("errorChannel.bridgeTo receive", spanAssert -> spanAssert
84-
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_NAME.asString(), "errorChannel.bridgeTo")
85-
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_TYPE.asString(), "handler")
86-
.hasKindEqualTo(Span.Kind.CONSUMER));
68+
.hasASpanWithName("dataMessageSource receive",
69+
spanAssert -> spanAssert
70+
.hasTag(IntegrationObservation.GatewayTags.COMPONENT_TYPE.asString(), "message-source")
71+
.hasKindEqualTo(Span.Kind.CONSUMER))
72+
.hasASpanWithName("inputChannel send",
73+
spanAssert -> spanAssert
74+
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_TYPE.asString(), "producer")
75+
.hasKindEqualTo(Span.Kind.PRODUCER))
76+
.hasASpanWithName("dataHandler receive",
77+
spanAssert -> spanAssert
78+
.hasTag(IntegrationObservation.HandlerTags.COMPONENT_TYPE.asString(), "handler")
79+
.hasKindEqualTo(Span.Kind.CONSUMER))
80+
.hasASpanWithName("errorChannel send",
81+
spanAssert -> spanAssert
82+
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_TYPE.asString(), "producer")
83+
.hasKindEqualTo(Span.Kind.PRODUCER))
84+
.hasASpanWithName("errorChannel.bridgeTo receive",
85+
spanAssert -> spanAssert
86+
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_TYPE.asString(), "handler")
87+
.hasKindEqualTo(Span.Kind.CONSUMER));
8788
};
8889
}
8990

0 commit comments

Comments
 (0)