From 07dbde4fa782d21ec0911a70938264d4f39d52bd Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Wed, 30 Jul 2025 10:18:27 -0700 Subject: [PATCH 1/3] POC for severity_based log record processor --- .../fileconfig/DeclarativeConfiguration.java | 25 ++ ...edLogRecordProcessorComponentProvider.java | 72 +++++ ...toconfigure.spi.internal.ComponentProvider | 1 + ...gRecordProcessorComponentProviderTest.java | 148 ++++++++++ .../logs/SeverityBasedLogRecordProcessor.java | 70 +++++ ...everityBasedLogRecordProcessorBuilder.java | 66 +++++ .../SeverityBasedLogRecordProcessorTest.java | 270 ++++++++++++++++++ 7 files changed, 652 insertions(+) create mode 100644 sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java create mode 100644 sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SeverityBasedLogRecordProcessorComponentProviderTest.java create mode 100644 sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java create mode 100644 sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java create mode 100644 sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java diff --git a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java index a4b336b8b94..542ca027c89 100644 --- a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java +++ b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java @@ -15,8 +15,10 @@ import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.autoconfigure.internal.SpiHelper; import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider; +import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.LogRecordProcessorModel; import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.OpenTelemetryConfigurationModel; import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.SamplerModel; +import io.opentelemetry.sdk.logs.LogRecordProcessor; import io.opentelemetry.sdk.trace.samplers.Sampler; import java.io.Closeable; import java.io.IOException; @@ -210,6 +212,29 @@ public static Sampler createSampler(DeclarativeConfigProperties genericSamplerMo samplerModel); } + /** + * Create a {@link LogRecordProcessor} from the {@code logRecordProcessorModel} representing the + * log record processor config. + * + *

This is used when log record processors are composed, with one processor accepting one or + * more additional processors as config properties. The {@link ComponentProvider} implementation + * can call this to configure a delegate {@link LogRecordProcessor} from the {@link + * DeclarativeConfigProperties} corresponding to a particular config property. + */ + public static LogRecordProcessor createLogRecordProcessor( + DeclarativeConfigProperties genericLogRecordProcessorModel) { + YamlDeclarativeConfigProperties yamlDeclarativeConfigProperties = + requireYamlDeclarativeConfigProperties(genericLogRecordProcessorModel); + LogRecordProcessorModel logRecordProcessorModel = + MAPPER.convertValue( + DeclarativeConfigProperties.toMap(yamlDeclarativeConfigProperties), + LogRecordProcessorModel.class); + return createAndMaybeCleanup( + LogRecordProcessorFactory.getInstance(), + SpiHelper.create(yamlDeclarativeConfigProperties.getComponentLoader()), + logRecordProcessorModel); + } + private static YamlDeclarativeConfigProperties requireYamlDeclarativeConfigProperties( DeclarativeConfigProperties declarativeConfigProperties) { if (!(declarativeConfigProperties instanceof YamlDeclarativeConfigProperties)) { diff --git a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java new file mode 100644 index 00000000000..f3b0a40ddbe --- /dev/null +++ b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java @@ -0,0 +1,72 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.extension.incubator.fileconfig.internal; + +import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; +import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider; +import io.opentelemetry.sdk.extension.incubator.fileconfig.DeclarativeConfiguration; +import io.opentelemetry.sdk.logs.LogRecordProcessor; +import io.opentelemetry.sdk.logs.SeverityBasedLogRecordProcessor; +import java.util.ArrayList; +import java.util.List; + +/** + * ComponentProvider for SeverityBasedLogRecordProcessor to support declarative configuration. + * + *

This provider creates a {@link SeverityBasedLogRecordProcessor} that filters log records + * based on minimum severity level. Only log records with a severity level greater than or + * equal to the configured minimum are forwarded to the configured downstream processors. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class SeverityBasedLogRecordProcessorComponentProvider + implements ComponentProvider { + + @Override + public Class getType() { + return LogRecordProcessor.class; + } + + @Override + public String getName() { + return "severity_based"; + } + + @Override + public LogRecordProcessor create(DeclarativeConfigProperties config) { + String minimumSeverityStr = config.getString("minimum_severity"); + if (minimumSeverityStr == null) { + throw new IllegalArgumentException( + "minimum_severity is required for severity_based log processors"); + } + + Severity minimumSeverity; + try { + minimumSeverity = Severity.valueOf(minimumSeverityStr); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid severity value: " + minimumSeverityStr, e); + } + + List processorConfigs = config.getStructuredList("processors"); + if (processorConfigs == null || processorConfigs.isEmpty()) { + throw new IllegalArgumentException( + "At least one processor is required for severity_based log processors"); + } + + List processors = new ArrayList<>(); + for (DeclarativeConfigProperties processorConfig : processorConfigs) { + LogRecordProcessor processor = + DeclarativeConfiguration.createLogRecordProcessor(processorConfig); + processors.add(processor); + } + + return SeverityBasedLogRecordProcessor.builder(minimumSeverity) + .addProcessors(processors) + .build(); + } +} diff --git a/sdk-extensions/incubator/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider b/sdk-extensions/incubator/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider index a1a361a5f37..8de5b9f1e9a 100644 --- a/sdk-extensions/incubator/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider +++ b/sdk-extensions/incubator/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider @@ -1 +1,2 @@ io.opentelemetry.sdk.extension.incubator.fileconfig.ServiceResourceDetector +io.opentelemetry.sdk.extension.incubator.fileconfig.internal.SeverityBasedLogRecordProcessorComponentProvider diff --git a/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SeverityBasedLogRecordProcessorComponentProviderTest.java b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SeverityBasedLogRecordProcessorComponentProviderTest.java new file mode 100644 index 00000000000..f8488accdb2 --- /dev/null +++ b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SeverityBasedLogRecordProcessorComponentProviderTest.java @@ -0,0 +1,148 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.extension.incubator.fileconfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; +import io.opentelemetry.common.ComponentLoader; +import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.SeverityBasedLogRecordProcessorComponentProvider; +import io.opentelemetry.sdk.logs.LogRecordProcessor; +import io.opentelemetry.sdk.logs.SeverityBasedLogRecordProcessor; +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +class SeverityBasedLogRecordProcessorComponentProviderTest { + + @Test + void createSeverityBasedProcessor_DirectComponentProvider() { + SeverityBasedLogRecordProcessorComponentProvider provider = + new SeverityBasedLogRecordProcessorComponentProvider(); + + assertThat(provider.getType()).isEqualTo(LogRecordProcessor.class); + assertThat(provider.getName()).isEqualTo("severity_based"); + } + + @Test + void createSeverityBasedProcessor_ValidConfig() { + DeclarativeConfigProperties config = + getConfig( + "minimum_severity: \"WARN\"\n" + + "processors:\n" + + " - simple:\n" + + " exporter:\n" + + " console: {}\n"); + + SeverityBasedLogRecordProcessorComponentProvider provider = + new SeverityBasedLogRecordProcessorComponentProvider(); + + LogRecordProcessor processor = provider.create(config); + + assertThat(processor).isInstanceOf(SeverityBasedLogRecordProcessor.class); + + assertThat(processor.toString()) + .contains("minimumSeverity=WARN") + .contains("delegate=SimpleLogRecordProcessor") + .contains("logRecordExporter=SystemOutLogRecordExporter"); + } + + @Test + void createSeverityBasedProcessor_MissingMinimumSeverity() { + DeclarativeConfigProperties config = + getConfig( + "processors:\n" // this comment exists only to influence spotless formatting + + " - simple:\n" + + " exporter:\n" + + " console: {}\n"); + + SeverityBasedLogRecordProcessorComponentProvider provider = + new SeverityBasedLogRecordProcessorComponentProvider(); + + assertThatThrownBy(() -> provider.create(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("minimum_severity is required for severity_based log processors"); + } + + @Test + void createSeverityBasedProcessor_InvalidSeverity() { + + DeclarativeConfigProperties config = + getConfig( + "minimum_severity: \"INVALID\"\n" + + "processors:\n" + + " - simple:\n" + + " exporter:\n" + + " console: {}\n"); + + SeverityBasedLogRecordProcessorComponentProvider provider = + new SeverityBasedLogRecordProcessorComponentProvider(); + + assertThatThrownBy(() -> provider.create(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid severity value: INVALID"); + } + + @Test + void createSeverityBasedProcessor_MissingProcessors() { + DeclarativeConfigProperties config = getConfig(""); + + SeverityBasedLogRecordProcessorComponentProvider provider = + new SeverityBasedLogRecordProcessorComponentProvider(); + + assertThatThrownBy(() -> provider.create(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("minimum_severity is required for severity_based log processors"); + } + + @Test + void createSeverityBasedProcessor_EmptyProcessors() { + DeclarativeConfigProperties config = getConfig("minimum_severity: \"WARN\"\nprocessors: []\n"); + + SeverityBasedLogRecordProcessorComponentProvider provider = + new SeverityBasedLogRecordProcessorComponentProvider(); + + assertThatThrownBy(() -> provider.create(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("At least one processor is required for severity_based log processors"); + } + + @Test + void createSeverityBasedProcessor_MultipleProcessors() { + DeclarativeConfigProperties config = + getConfig( + "minimum_severity: \"INFO\"\n" + + "processors:\n" + + " - simple:\n" + + " exporter:\n" + + " console: {}\n" + + " - simple:\n" + + " exporter:\n" + + " console: {}\n"); + + SeverityBasedLogRecordProcessorComponentProvider provider = + new SeverityBasedLogRecordProcessorComponentProvider(); + + LogRecordProcessor processor = provider.create(config); + + assertThat(processor).isInstanceOf(SeverityBasedLogRecordProcessor.class); + assertThat(processor.toString()).contains("SeverityBasedLogRecordProcessor"); + } + + private static DeclarativeConfigProperties getConfig(String yaml) { + Object yamlObj = + DeclarativeConfiguration.loadYaml( + new ByteArrayInputStream(yaml.getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap()); + + return DeclarativeConfiguration.toConfigProperties( + yamlObj, + ComponentLoader.forClassLoader( + SeverityBasedLogRecordProcessorComponentProviderTest.class.getClassLoader())); + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java new file mode 100644 index 00000000000..f61350d6840 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java @@ -0,0 +1,70 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.util.List; + +/** + * Implementation of {@link LogRecordProcessor} that filters log records based on minimum severity + * level and delegates to downstream processors. + * + *

This processor only forwards log records to downstream processors if the log record's severity + * level is greater than or equal to the configured minimum severity level. + */ +public final class SeverityBasedLogRecordProcessor implements LogRecordProcessor { + + private final Severity minimumSeverity; + private final LogRecordProcessor delegate; + + SeverityBasedLogRecordProcessor(Severity minimumSeverity, List processors) { + this.minimumSeverity = requireNonNull(minimumSeverity, "minimumSeverity"); + requireNonNull(processors, "processors"); + this.delegate = LogRecordProcessor.composite(processors); + } + + /** + * Returns a new {@link SeverityBasedLogRecordProcessorBuilder} to construct a {@link + * SeverityBasedLogRecordProcessor}. + * + * @param minimumSeverity the minimum severity level required for processing + * @return a new {@link SeverityBasedLogRecordProcessorBuilder} + */ + public static SeverityBasedLogRecordProcessorBuilder builder(Severity minimumSeverity) { + return new SeverityBasedLogRecordProcessorBuilder(minimumSeverity); + } + + @Override + public void onEmit(Context context, ReadWriteLogRecord logRecord) { + if (logRecord.getSeverity().getSeverityNumber() >= minimumSeverity.getSeverityNumber()) { + delegate.onEmit(context, logRecord); + } + } + + @Override + public CompletableResultCode shutdown() { + return delegate.shutdown(); + } + + @Override + public CompletableResultCode forceFlush() { + return delegate.forceFlush(); + } + + @Override + public String toString() { + return "SeverityBasedLogRecordProcessor{" + + "minimumSeverity=" + + minimumSeverity + + ", delegate=" + + delegate + + '}'; + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java new file mode 100644 index 00000000000..622450592ba --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java @@ -0,0 +1,66 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.api.logs.Severity; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** Builder class for {@link SeverityBasedLogRecordProcessor}. */ +public final class SeverityBasedLogRecordProcessorBuilder { + + private final Severity minimumSeverity; + private final List processors = new ArrayList<>(); + + SeverityBasedLogRecordProcessorBuilder(Severity minimumSeverity) { + this.minimumSeverity = requireNonNull(minimumSeverity, "minimumSeverity"); + } + + /** + * Adds multiple {@link LogRecordProcessor}s to the list of downstream processors. + * + * @param processors the processors to add + * @return this builder + */ + public SeverityBasedLogRecordProcessorBuilder addProcessors(LogRecordProcessor... processors) { + requireNonNull(processors, "processors"); + addProcessors(Arrays.asList(processors)); + return this; + } + + /** + * Adds multiple {@link LogRecordProcessor}s to the list of downstream processors. + * + * @param processors the processors to add + * @return this builder + */ + public SeverityBasedLogRecordProcessorBuilder addProcessors( + Iterable processors) { + + requireNonNull(processors, "processors"); + for (LogRecordProcessor processor : processors) { + requireNonNull(processor, "processor"); + this.processors.add(processor); + } + return this; + } + + /** + * Returns a new {@link SeverityBasedLogRecordProcessor} with the configuration of this builder. + * + * @return a new {@link SeverityBasedLogRecordProcessor} + * @throws IllegalArgumentException if no processors have been added + */ + public SeverityBasedLogRecordProcessor build() { + if (processors.isEmpty()) { + throw new IllegalArgumentException("At least one processor must be added"); + } + return new SeverityBasedLogRecordProcessor(minimumSeverity, processors); + } +} diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java new file mode 100644 index 00000000000..263d32c2cc6 --- /dev/null +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java @@ -0,0 +1,270 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class SeverityBasedLogRecordProcessorTest { + + @Mock private LogRecordProcessor processor1; + @Mock private LogRecordProcessor processor2; + @Mock private ReadWriteLogRecord logRecord; + + private Context context; + + @BeforeEach + void setUp() { + context = Context.current(); + when(processor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(processor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + when(processor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(processor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + } + + @Test + void builder_RequiresMinimumSeverity() { + assertThatThrownBy(() -> SeverityBasedLogRecordProcessor.builder(null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("minimumSeverity"); + } + + @Test + void builder_RequiresAtLeastOneProcessor() { + assertThatThrownBy(() -> SeverityBasedLogRecordProcessor.builder(Severity.INFO).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("At least one processor must be added"); + } + + @Test + void builder_NullProcessor() { + assertThatThrownBy( + () -> + SeverityBasedLogRecordProcessor.builder(Severity.INFO) + .addProcessors((LogRecordProcessor) null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("processor"); + } + + @Test + void builder_NullProcessorArray() { + assertThatThrownBy( + () -> + SeverityBasedLogRecordProcessor.builder(Severity.INFO) + .addProcessors((LogRecordProcessor[]) null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("processors"); + } + + @Test + void builder_NullProcessorIterable() { + assertThatThrownBy( + () -> + SeverityBasedLogRecordProcessor.builder(Severity.INFO) + .addProcessors((Iterable) null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("processors"); + } + + @Test + void onEmit_SeverityMeetsMinimum_DelegatesToAllProcessors() { + when(logRecord.getSeverity()).thenReturn(Severity.WARN); + + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.WARN) + .addProcessors(processor1, processor2) + .build(); + + processor.onEmit(context, logRecord); + + verify(processor1).onEmit(same(context), same(logRecord)); + verify(processor2).onEmit(same(context), same(logRecord)); + } + + @Test + void onEmit_SeverityAboveMinimum_DelegatesToAllProcessors() { + when(logRecord.getSeverity()).thenReturn(Severity.ERROR); + + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.WARN) + .addProcessors(processor1, processor2) + .build(); + + processor.onEmit(context, logRecord); + + verify(processor1).onEmit(same(context), same(logRecord)); + verify(processor2).onEmit(same(context), same(logRecord)); + } + + @Test + void onEmit_SeverityBelowMinimum_DoesNotDelegate() { + when(logRecord.getSeverity()).thenReturn(Severity.DEBUG); + + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.WARN) + .addProcessors(processor1, processor2) + .build(); + + processor.onEmit(context, logRecord); + + verify(processor1, never()).onEmit(any(), any()); + verify(processor2, never()).onEmit(any(), any()); + } + + @Test + void onEmit_UndefinedSeverity_DoesNotDelegate() { + when(logRecord.getSeverity()).thenReturn(Severity.UNDEFINED_SEVERITY_NUMBER); + + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.INFO).addProcessors(processor1).build(); + + processor.onEmit(context, logRecord); + + verify(processor1, never()).onEmit(any(), any()); + } + + @Test + void onEmit_VariousSeverityLevels() { + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.WARN).addProcessors(processor1).build(); + + // Test all severity levels + testSeverityLevel(processor, Severity.UNDEFINED_SEVERITY_NUMBER, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.TRACE, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.TRACE2, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.TRACE3, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.TRACE4, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.DEBUG, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.DEBUG2, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.DEBUG3, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.DEBUG4, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.INFO, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.INFO2, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.INFO3, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.INFO4, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.WARN, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.WARN2, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.WARN3, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.WARN4, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.ERROR, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.ERROR2, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.ERROR3, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.ERROR4, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.FATAL, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.FATAL2, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.FATAL3, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.FATAL4, /* shouldDelegate= */ true); + } + + private void testSeverityLevel( + SeverityBasedLogRecordProcessor processor, Severity severity, boolean shouldDelegate) { + when(logRecord.getSeverity()).thenReturn(severity); + + processor.onEmit(context, logRecord); + + if (shouldDelegate) { + verify(processor1).onEmit(same(context), same(logRecord)); + } else { + verify(processor1, never()).onEmit(same(context), same(logRecord)); + } + + // Reset mock for next test + org.mockito.Mockito.reset(processor1); + when(processor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(processor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + } + + @Test + void shutdown_DelegatesToAllProcessors() { + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.INFO) + .addProcessors(processor1, processor2) + .build(); + + CompletableResultCode result = processor.shutdown(); + + verify(processor1).shutdown(); + verify(processor2).shutdown(); + assertThat(result.isSuccess()).isTrue(); + } + + @Test + void forceFlush_DelegatesToAllProcessors() { + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.INFO) + .addProcessors(processor1, processor2) + .build(); + + CompletableResultCode result = processor.forceFlush(); + + verify(processor1).forceFlush(); + verify(processor2).forceFlush(); + assertThat(result.isSuccess()).isTrue(); + } + + @Test + void toString_Valid() { + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.WARN).addProcessors(processor1).build(); + + String toString = processor.toString(); + assertThat(toString).contains("SeverityBasedLogRecordProcessor"); + assertThat(toString).contains("minimumSeverity=WARN"); + assertThat(toString).contains("delegate="); + } + + @Test + void shutdown_ProcessorFailure() { + when(processor1.shutdown()).thenReturn(CompletableResultCode.ofFailure()); + when(processor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.INFO) + .addProcessors(processor1, processor2) + .build(); + + CompletableResultCode result = processor.shutdown(); + + verify(processor1).shutdown(); + verify(processor2).shutdown(); + assertThat(result.isSuccess()).isFalse(); + } + + @Test + void forceFlush_ProcessorFailure() { + when(processor1.forceFlush()).thenReturn(CompletableResultCode.ofFailure()); + when(processor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.INFO) + .addProcessors(processor1, processor2) + .build(); + + CompletableResultCode result = processor.forceFlush(); + + verify(processor1).forceFlush(); + verify(processor2).forceFlush(); + assertThat(result.isSuccess()).isFalse(); + } +} From dd55ab80f1ddb9cec842af93eeadd9c1feaeebe4 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Wed, 30 Jul 2025 12:05:21 -0700 Subject: [PATCH 2/3] POC for trace_based log record processor --- ...edLogRecordProcessorComponentProvider.java | 4 - ...edLogRecordProcessorComponentProvider.java | 52 ++++ ...toconfigure.spi.internal.ComponentProvider | 1 + ...gRecordProcessorComponentProviderTest.java | 110 ++++++++ .../logs/SeverityBasedLogRecordProcessor.java | 3 +- .../logs/TraceBasedLogRecordProcessor.java | 61 +++++ .../TraceBasedLogRecordProcessorBuilder.java | 62 +++++ .../TraceBasedLogRecordProcessorTest.java | 240 ++++++++++++++++++ 8 files changed, 527 insertions(+), 6 deletions(-) create mode 100644 sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/TraceBasedLogRecordProcessorComponentProvider.java create mode 100644 sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/TraceBasedLogRecordProcessorComponentProviderTest.java create mode 100644 sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java create mode 100644 sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java create mode 100644 sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java diff --git a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java index f3b0a40ddbe..2f0ea4a599c 100644 --- a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java +++ b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java @@ -17,10 +17,6 @@ /** * ComponentProvider for SeverityBasedLogRecordProcessor to support declarative configuration. * - *

This provider creates a {@link SeverityBasedLogRecordProcessor} that filters log records - * based on minimum severity level. Only log records with a severity level greater than or - * equal to the configured minimum are forwarded to the configured downstream processors. - * *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ diff --git a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/TraceBasedLogRecordProcessorComponentProvider.java b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/TraceBasedLogRecordProcessorComponentProvider.java new file mode 100644 index 00000000000..ebf153e8a51 --- /dev/null +++ b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/TraceBasedLogRecordProcessorComponentProvider.java @@ -0,0 +1,52 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.extension.incubator.fileconfig.internal; + +import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; +import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider; +import io.opentelemetry.sdk.extension.incubator.fileconfig.DeclarativeConfiguration; +import io.opentelemetry.sdk.logs.LogRecordProcessor; +import io.opentelemetry.sdk.logs.TraceBasedLogRecordProcessor; +import java.util.ArrayList; +import java.util.List; + +/** + * ComponentProvider for TraceBasedLogRecordProcessor to support declarative configuration. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class TraceBasedLogRecordProcessorComponentProvider + implements ComponentProvider { + + @Override + public Class getType() { + return LogRecordProcessor.class; + } + + @Override + public String getName() { + return "trace_based"; + } + + @Override + public LogRecordProcessor create(DeclarativeConfigProperties config) { + List processorConfigs = config.getStructuredList("processors"); + if (processorConfigs == null || processorConfigs.isEmpty()) { + throw new IllegalArgumentException( + "At least one processor is required for trace_based log processors"); + } + + List processors = new ArrayList<>(); + for (DeclarativeConfigProperties processorConfig : processorConfigs) { + LogRecordProcessor processor = + DeclarativeConfiguration.createLogRecordProcessor(processorConfig); + processors.add(processor); + } + + return TraceBasedLogRecordProcessor.builder().addProcessors(processors).build(); + } +} diff --git a/sdk-extensions/incubator/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider b/sdk-extensions/incubator/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider index 8de5b9f1e9a..7f54952442e 100644 --- a/sdk-extensions/incubator/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider +++ b/sdk-extensions/incubator/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider @@ -1,2 +1,3 @@ io.opentelemetry.sdk.extension.incubator.fileconfig.ServiceResourceDetector io.opentelemetry.sdk.extension.incubator.fileconfig.internal.SeverityBasedLogRecordProcessorComponentProvider +io.opentelemetry.sdk.extension.incubator.fileconfig.internal.TraceBasedLogRecordProcessorComponentProvider diff --git a/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/TraceBasedLogRecordProcessorComponentProviderTest.java b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/TraceBasedLogRecordProcessorComponentProviderTest.java new file mode 100644 index 00000000000..3e056850871 --- /dev/null +++ b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/TraceBasedLogRecordProcessorComponentProviderTest.java @@ -0,0 +1,110 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.extension.incubator.fileconfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; +import io.opentelemetry.common.ComponentLoader; +import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.TraceBasedLogRecordProcessorComponentProvider; +import io.opentelemetry.sdk.logs.LogRecordProcessor; +import io.opentelemetry.sdk.logs.TraceBasedLogRecordProcessor; +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +class TraceBasedLogRecordProcessorComponentProviderTest { + + @Test + void createTraceBasedProcessor_DirectComponentProvider() { + TraceBasedLogRecordProcessorComponentProvider provider = + new TraceBasedLogRecordProcessorComponentProvider(); + + assertThat(provider.getType()).isEqualTo(LogRecordProcessor.class); + assertThat(provider.getName()).isEqualTo("trace_based"); + } + + @Test + void createTraceBasedProcessor_ValidConfig() { + DeclarativeConfigProperties config = + getConfig( + "processors:\n" // this comment exists only to influence spotless formatting + + " - simple:\n" + + " exporter:\n" + + " console: {}\n"); + + TraceBasedLogRecordProcessorComponentProvider provider = + new TraceBasedLogRecordProcessorComponentProvider(); + + LogRecordProcessor processor = provider.create(config); + + assertThat(processor).isInstanceOf(TraceBasedLogRecordProcessor.class); + + assertThat(processor.toString()) + .contains("TraceBasedLogRecordProcessor") + .contains("delegate=SimpleLogRecordProcessor") + .contains("logRecordExporter=SystemOutLogRecordExporter"); + } + + @Test + void createTraceBasedProcessor_MissingProcessors() { + DeclarativeConfigProperties config = getConfig(""); + + TraceBasedLogRecordProcessorComponentProvider provider = + new TraceBasedLogRecordProcessorComponentProvider(); + + assertThatThrownBy(() -> provider.create(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("At least one processor is required for trace_based log processors"); + } + + @Test + void createTraceBasedProcessor_EmptyProcessors() { + DeclarativeConfigProperties config = getConfig("processors: []\n"); + + TraceBasedLogRecordProcessorComponentProvider provider = + new TraceBasedLogRecordProcessorComponentProvider(); + + assertThatThrownBy(() -> provider.create(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("At least one processor is required for trace_based log processors"); + } + + @Test + void createTraceBasedProcessor_MultipleProcessors() { + DeclarativeConfigProperties config = + getConfig( + "processors:\n" + + " - simple:\n" + + " exporter:\n" + + " console: {}\n" + + " - simple:\n" + + " exporter:\n" + + " console: {}\n"); + + TraceBasedLogRecordProcessorComponentProvider provider = + new TraceBasedLogRecordProcessorComponentProvider(); + + LogRecordProcessor processor = provider.create(config); + + assertThat(processor).isInstanceOf(TraceBasedLogRecordProcessor.class); + assertThat(processor.toString()).contains("TraceBasedLogRecordProcessor"); + } + + private static DeclarativeConfigProperties getConfig(String yaml) { + Object yamlObj = + DeclarativeConfiguration.loadYaml( + new ByteArrayInputStream(yaml.getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap()); + + return DeclarativeConfiguration.toConfigProperties( + yamlObj, + ComponentLoader.forClassLoader( + TraceBasedLogRecordProcessorComponentProviderTest.class.getClassLoader())); + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java index f61350d6840..78b73f65335 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java @@ -16,8 +16,7 @@ * Implementation of {@link LogRecordProcessor} that filters log records based on minimum severity * level and delegates to downstream processors. * - *

This processor only forwards log records to downstream processors if the log record's severity - * level is greater than or equal to the configured minimum severity level. + *

Only log records with severity greater than or equal to the configured minimum are forwarded. */ public final class SeverityBasedLogRecordProcessor implements LogRecordProcessor { diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java new file mode 100644 index 00000000000..c039c1b0e6e --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java @@ -0,0 +1,61 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.util.List; + +/** + * A {@link LogRecordProcessor} that filters out log records associated with + * sampled out spans. + * + * Log records not tied to any span (invalid span context) are not sampled out. + */ +public final class TraceBasedLogRecordProcessor implements LogRecordProcessor { + + private final LogRecordProcessor delegate; + + TraceBasedLogRecordProcessor(List processors) { + requireNonNull(processors, "processors"); + this.delegate = LogRecordProcessor.composite(processors); + } + + /** + * Returns a new {@link TraceBasedLogRecordProcessorBuilder} to construct a {@link + * TraceBasedLogRecordProcessor}. + * + * @return a new {@link TraceBasedLogRecordProcessorBuilder} + */ + public static TraceBasedLogRecordProcessorBuilder builder() { + return new TraceBasedLogRecordProcessorBuilder(); + } + + @Override + public void onEmit(Context context, ReadWriteLogRecord logRecord) { + if (logRecord.getSpanContext().isValid() && !logRecord.getSpanContext().isSampled()) { + return; + } + delegate.onEmit(context, logRecord); + } + + @Override + public CompletableResultCode shutdown() { + return delegate.shutdown(); + } + + @Override + public CompletableResultCode forceFlush() { + return delegate.forceFlush(); + } + + @Override + public String toString() { + return "TraceBasedLogRecordProcessor{" + "delegate=" + delegate + '}'; + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java new file mode 100644 index 00000000000..92c099ce0f7 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java @@ -0,0 +1,62 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** Builder class for {@link TraceBasedLogRecordProcessor}. */ +public final class TraceBasedLogRecordProcessorBuilder { + + private final List processors = new ArrayList<>(); + + TraceBasedLogRecordProcessorBuilder() {} + + /** + * Adds multiple {@link LogRecordProcessor}s to the list of downstream processors. + * + * @param processors the processors to add + * @return this builder + */ + public TraceBasedLogRecordProcessorBuilder addProcessors(LogRecordProcessor... processors) { + requireNonNull(processors, "processors"); + addProcessors(Arrays.asList(processors)); + return this; + } + + /** + * Adds multiple {@link LogRecordProcessor}s to the list of downstream processors. + * + * @param processors the processors to add + * @return this builder + */ + public TraceBasedLogRecordProcessorBuilder addProcessors( + Iterable processors) { + + requireNonNull(processors, "processors"); + for (LogRecordProcessor processor : processors) { + requireNonNull(processor, "processor"); + this.processors.add(processor); + } + return this; + } + + /** + * Returns a new {@link TraceBasedLogRecordProcessor} with the configuration of this builder. + * + * @return a new {@link TraceBasedLogRecordProcessor} + * @throws IllegalArgumentException if no processors have been added + */ + public TraceBasedLogRecordProcessor build() { + if (processors.isEmpty()) { + throw new IllegalArgumentException("At least one processor must be added"); + } + return new TraceBasedLogRecordProcessor(processors); + } +} diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java new file mode 100644 index 00000000000..38c8eb1f774 --- /dev/null +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java @@ -0,0 +1,240 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class TraceBasedLogRecordProcessorTest { + + @Mock private LogRecordProcessor processor1; + @Mock private LogRecordProcessor processor2; + @Mock private ReadWriteLogRecord logRecord; + + private Context context; + private SpanContext sampledSpanContext; + private SpanContext notSampledSpanContext; + private SpanContext invalidSpanContext; + + @BeforeEach + void setUp() { + context = Context.current(); + when(processor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(processor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + when(processor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(processor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + + // Create sampled span context + sampledSpanContext = + SpanContext.create( + TraceId.fromLongs(1, 2), + SpanId.fromLong(3), + TraceFlags.getSampled(), + TraceState.getDefault()); + + // Create not sampled span context + notSampledSpanContext = + SpanContext.create( + TraceId.fromLongs(1, 2), + SpanId.fromLong(3), + TraceFlags.getDefault(), + TraceState.getDefault()); + + // Create invalid span context + invalidSpanContext = SpanContext.getInvalid(); + } + + @Test + void builder_RequiresAtLeastOneProcessor() { + assertThatThrownBy(() -> TraceBasedLogRecordProcessor.builder().build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("At least one processor must be added"); + } + + @Test + void builder_NullProcessor() { + assertThatThrownBy( + () -> TraceBasedLogRecordProcessor.builder().addProcessors((LogRecordProcessor) null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("processor"); + } + + @Test + void builder_NullProcessorArray() { + assertThatThrownBy( + () -> TraceBasedLogRecordProcessor.builder().addProcessors((LogRecordProcessor[]) null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("processors"); + } + + @Test + void builder_NullProcessorIterable() { + assertThatThrownBy( + () -> + TraceBasedLogRecordProcessor.builder() + .addProcessors((Iterable) null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("processors"); + } + + @Test + void onEmit_SampledSpanContext_DelegatesToAllProcessors() { + when(logRecord.getSpanContext()).thenReturn(sampledSpanContext); + + TraceBasedLogRecordProcessor processor = + TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + + processor.onEmit(context, logRecord); + + verify(processor1).onEmit(same(context), same(logRecord)); + verify(processor2).onEmit(same(context), same(logRecord)); + } + + @Test + void onEmit_NotSampledSpanContext_DoesNotDelegate() { + when(logRecord.getSpanContext()).thenReturn(notSampledSpanContext); + + TraceBasedLogRecordProcessor processor = + TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + + processor.onEmit(context, logRecord); + + verify(processor1, never()).onEmit(any(), any()); + verify(processor2, never()).onEmit(any(), any()); + } + + @Test + void onEmit_InvalidSpanContext_DelegatesToProcessor() { + when(logRecord.getSpanContext()).thenReturn(invalidSpanContext); + + TraceBasedLogRecordProcessor processor = + TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + + processor.onEmit(context, logRecord); + + verify(processor1).onEmit(same(context), same(logRecord)); + verify(processor2).onEmit(same(context), same(logRecord)); + } + + @Test + void onEmit_VariousSpanContexts() { + TraceBasedLogRecordProcessor processor = + TraceBasedLogRecordProcessor.builder().addProcessors(processor1).build(); + + // Test sampled span context + testSpanContext(processor, sampledSpanContext, /* shouldDelegate= */ true); + + // Test not sampled span context + testSpanContext(processor, notSampledSpanContext, /* shouldDelegate= */ false); + + // Test invalid span context + testSpanContext(processor, invalidSpanContext, /* shouldDelegate= */ true); + } + + private void testSpanContext( + TraceBasedLogRecordProcessor processor, SpanContext spanContext, boolean shouldDelegate) { + when(logRecord.getSpanContext()).thenReturn(spanContext); + + processor.onEmit(context, logRecord); + + if (shouldDelegate) { + verify(processor1).onEmit(same(context), same(logRecord)); + } else { + verify(processor1, never()).onEmit(same(context), same(logRecord)); + } + + // Reset mock for next test + org.mockito.Mockito.reset(processor1); + when(processor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(processor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + } + + @Test + void shutdown_DelegatesToAllProcessors() { + TraceBasedLogRecordProcessor processor = + TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + + CompletableResultCode result = processor.shutdown(); + + verify(processor1).shutdown(); + verify(processor2).shutdown(); + assertThat(result.isSuccess()).isTrue(); + } + + @Test + void forceFlush_DelegatesToAllProcessors() { + TraceBasedLogRecordProcessor processor = + TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + + CompletableResultCode result = processor.forceFlush(); + + verify(processor1).forceFlush(); + verify(processor2).forceFlush(); + assertThat(result.isSuccess()).isTrue(); + } + + @Test + void toString_Valid() { + TraceBasedLogRecordProcessor processor = + TraceBasedLogRecordProcessor.builder().addProcessors(processor1).build(); + + String toString = processor.toString(); + assertThat(toString).contains("TraceBasedLogRecordProcessor"); + assertThat(toString).contains("delegate="); + } + + @Test + void shutdown_ProcessorFailure() { + when(processor1.shutdown()).thenReturn(CompletableResultCode.ofFailure()); + when(processor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + + TraceBasedLogRecordProcessor processor = + TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + + CompletableResultCode result = processor.shutdown(); + + verify(processor1).shutdown(); + verify(processor2).shutdown(); + assertThat(result.isSuccess()).isFalse(); + } + + @Test + void forceFlush_ProcessorFailure() { + when(processor1.forceFlush()).thenReturn(CompletableResultCode.ofFailure()); + when(processor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + + TraceBasedLogRecordProcessor processor = + TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + + CompletableResultCode result = processor.forceFlush(); + + verify(processor1).forceFlush(); + verify(processor2).forceFlush(); + assertThat(result.isSuccess()).isFalse(); + } +} From 066fdb8bfcbad7da42e7539f9cfdd2921c984645 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Wed, 30 Jul 2025 13:12:11 -0700 Subject: [PATCH 3/3] Switch to single delegate processor --- ...edLogRecordProcessorComponentProvider.java | 20 +-- ...edLogRecordProcessorComponentProvider.java | 18 +-- ...gRecordProcessorComponentProviderTest.java | 55 +++----- ...gRecordProcessorComponentProviderTest.java | 35 ++--- .../logs/SeverityBasedLogRecordProcessor.java | 12 +- ...everityBasedLogRecordProcessorBuilder.java | 43 +----- .../logs/TraceBasedLogRecordProcessor.java | 18 ++- .../TraceBasedLogRecordProcessorBuilder.java | 43 +----- .../SeverityBasedLogRecordProcessorTest.java | 128 +++++------------- .../TraceBasedLogRecordProcessorTest.java | 111 +++++---------- 10 files changed, 128 insertions(+), 355 deletions(-) diff --git a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java index 2f0ea4a599c..22f06fe495c 100644 --- a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java +++ b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java @@ -11,8 +11,6 @@ import io.opentelemetry.sdk.extension.incubator.fileconfig.DeclarativeConfiguration; import io.opentelemetry.sdk.logs.LogRecordProcessor; import io.opentelemetry.sdk.logs.SeverityBasedLogRecordProcessor; -import java.util.ArrayList; -import java.util.List; /** * ComponentProvider for SeverityBasedLogRecordProcessor to support declarative configuration. @@ -48,21 +46,13 @@ public LogRecordProcessor create(DeclarativeConfigProperties config) { throw new IllegalArgumentException("Invalid severity value: " + minimumSeverityStr, e); } - List processorConfigs = config.getStructuredList("processors"); - if (processorConfigs == null || processorConfigs.isEmpty()) { - throw new IllegalArgumentException( - "At least one processor is required for severity_based log processors"); + DeclarativeConfigProperties delegateConfig = config.getStructured("delegate"); + if (delegateConfig == null) { + throw new IllegalArgumentException("delegate is required for severity_based log processors"); } - List processors = new ArrayList<>(); - for (DeclarativeConfigProperties processorConfig : processorConfigs) { - LogRecordProcessor processor = - DeclarativeConfiguration.createLogRecordProcessor(processorConfig); - processors.add(processor); - } + LogRecordProcessor delegate = DeclarativeConfiguration.createLogRecordProcessor(delegateConfig); - return SeverityBasedLogRecordProcessor.builder(minimumSeverity) - .addProcessors(processors) - .build(); + return SeverityBasedLogRecordProcessor.builder(minimumSeverity, delegate).build(); } } diff --git a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/TraceBasedLogRecordProcessorComponentProvider.java b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/TraceBasedLogRecordProcessorComponentProvider.java index ebf153e8a51..5e8e5d5a4a0 100644 --- a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/TraceBasedLogRecordProcessorComponentProvider.java +++ b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/TraceBasedLogRecordProcessorComponentProvider.java @@ -10,8 +10,6 @@ import io.opentelemetry.sdk.extension.incubator.fileconfig.DeclarativeConfiguration; import io.opentelemetry.sdk.logs.LogRecordProcessor; import io.opentelemetry.sdk.logs.TraceBasedLogRecordProcessor; -import java.util.ArrayList; -import java.util.List; /** * ComponentProvider for TraceBasedLogRecordProcessor to support declarative configuration. @@ -34,19 +32,13 @@ public String getName() { @Override public LogRecordProcessor create(DeclarativeConfigProperties config) { - List processorConfigs = config.getStructuredList("processors"); - if (processorConfigs == null || processorConfigs.isEmpty()) { - throw new IllegalArgumentException( - "At least one processor is required for trace_based log processors"); + DeclarativeConfigProperties delegateConfig = config.getStructured("delegate"); + if (delegateConfig == null) { + throw new IllegalArgumentException("delegate is required for trace_based log processors"); } - List processors = new ArrayList<>(); - for (DeclarativeConfigProperties processorConfig : processorConfigs) { - LogRecordProcessor processor = - DeclarativeConfiguration.createLogRecordProcessor(processorConfig); - processors.add(processor); - } + LogRecordProcessor delegate = DeclarativeConfiguration.createLogRecordProcessor(delegateConfig); - return TraceBasedLogRecordProcessor.builder().addProcessors(processors).build(); + return TraceBasedLogRecordProcessor.builder(delegate).build(); } } diff --git a/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SeverityBasedLogRecordProcessorComponentProviderTest.java b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SeverityBasedLogRecordProcessorComponentProviderTest.java index f8488accdb2..c1b72af89bf 100644 --- a/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SeverityBasedLogRecordProcessorComponentProviderTest.java +++ b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SeverityBasedLogRecordProcessorComponentProviderTest.java @@ -34,10 +34,10 @@ void createSeverityBasedProcessor_ValidConfig() { DeclarativeConfigProperties config = getConfig( "minimum_severity: \"WARN\"\n" - + "processors:\n" - + " - simple:\n" - + " exporter:\n" - + " console: {}\n"); + + "delegate:\n" + + " simple:\n" + + " exporter:\n" + + " console: {}\n"); SeverityBasedLogRecordProcessorComponentProvider provider = new SeverityBasedLogRecordProcessorComponentProvider(); @@ -56,10 +56,10 @@ void createSeverityBasedProcessor_ValidConfig() { void createSeverityBasedProcessor_MissingMinimumSeverity() { DeclarativeConfigProperties config = getConfig( - "processors:\n" // this comment exists only to influence spotless formatting - + " - simple:\n" - + " exporter:\n" - + " console: {}\n"); + "delegate:\n" // this comment exists only to influence spotless formatting + + " simple:\n" + + " exporter:\n" + + " console: {}\n"); SeverityBasedLogRecordProcessorComponentProvider provider = new SeverityBasedLogRecordProcessorComponentProvider(); @@ -75,10 +75,10 @@ void createSeverityBasedProcessor_InvalidSeverity() { DeclarativeConfigProperties config = getConfig( "minimum_severity: \"INVALID\"\n" - + "processors:\n" - + " - simple:\n" - + " exporter:\n" - + " console: {}\n"); + + "delegate:\n" + + " simple:\n" + + " exporter:\n" + + " console: {}\n"); SeverityBasedLogRecordProcessorComponentProvider provider = new SeverityBasedLogRecordProcessorComponentProvider(); @@ -89,41 +89,26 @@ void createSeverityBasedProcessor_InvalidSeverity() { } @Test - void createSeverityBasedProcessor_MissingProcessors() { - DeclarativeConfigProperties config = getConfig(""); + void createSeverityBasedProcessor_MissingDelegate() { + DeclarativeConfigProperties config = getConfig("minimum_severity: \"WARN\"\n"); SeverityBasedLogRecordProcessorComponentProvider provider = new SeverityBasedLogRecordProcessorComponentProvider(); assertThatThrownBy(() -> provider.create(config)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("minimum_severity is required for severity_based log processors"); - } - - @Test - void createSeverityBasedProcessor_EmptyProcessors() { - DeclarativeConfigProperties config = getConfig("minimum_severity: \"WARN\"\nprocessors: []\n"); - - SeverityBasedLogRecordProcessorComponentProvider provider = - new SeverityBasedLogRecordProcessorComponentProvider(); - - assertThatThrownBy(() -> provider.create(config)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("At least one processor is required for severity_based log processors"); + .hasMessage("delegate is required for severity_based log processors"); } @Test - void createSeverityBasedProcessor_MultipleProcessors() { + void createSeverityBasedProcessor_SingleDelegate() { DeclarativeConfigProperties config = getConfig( "minimum_severity: \"INFO\"\n" - + "processors:\n" - + " - simple:\n" - + " exporter:\n" - + " console: {}\n" - + " - simple:\n" - + " exporter:\n" - + " console: {}\n"); + + "delegate:\n" + + " simple:\n" + + " exporter:\n" + + " console: {}\n"); SeverityBasedLogRecordProcessorComponentProvider provider = new SeverityBasedLogRecordProcessorComponentProvider(); diff --git a/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/TraceBasedLogRecordProcessorComponentProviderTest.java b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/TraceBasedLogRecordProcessorComponentProviderTest.java index 3e056850871..32094743bcc 100644 --- a/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/TraceBasedLogRecordProcessorComponentProviderTest.java +++ b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/TraceBasedLogRecordProcessorComponentProviderTest.java @@ -33,10 +33,10 @@ void createTraceBasedProcessor_DirectComponentProvider() { void createTraceBasedProcessor_ValidConfig() { DeclarativeConfigProperties config = getConfig( - "processors:\n" // this comment exists only to influence spotless formatting - + " - simple:\n" - + " exporter:\n" - + " console: {}\n"); + "delegate:\n" // this comment exists only to influence spotless formatting + + " simple:\n" + + " exporter:\n" + + " console: {}\n"); TraceBasedLogRecordProcessorComponentProvider provider = new TraceBasedLogRecordProcessorComponentProvider(); @@ -52,7 +52,7 @@ void createTraceBasedProcessor_ValidConfig() { } @Test - void createTraceBasedProcessor_MissingProcessors() { + void createTraceBasedProcessor_MissingDelegate() { DeclarativeConfigProperties config = getConfig(""); TraceBasedLogRecordProcessorComponentProvider provider = @@ -60,32 +60,13 @@ void createTraceBasedProcessor_MissingProcessors() { assertThatThrownBy(() -> provider.create(config)) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("At least one processor is required for trace_based log processors"); + .hasMessage("delegate is required for trace_based log processors"); } @Test - void createTraceBasedProcessor_EmptyProcessors() { - DeclarativeConfigProperties config = getConfig("processors: []\n"); - - TraceBasedLogRecordProcessorComponentProvider provider = - new TraceBasedLogRecordProcessorComponentProvider(); - - assertThatThrownBy(() -> provider.create(config)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("At least one processor is required for trace_based log processors"); - } - - @Test - void createTraceBasedProcessor_MultipleProcessors() { + void createTraceBasedProcessor_SingleDelegate() { DeclarativeConfigProperties config = - getConfig( - "processors:\n" - + " - simple:\n" - + " exporter:\n" - + " console: {}\n" - + " - simple:\n" - + " exporter:\n" - + " console: {}\n"); + getConfig("delegate:\n" + " simple:\n" + " exporter:\n" + " console: {}\n"); TraceBasedLogRecordProcessorComponentProvider provider = new TraceBasedLogRecordProcessorComponentProvider(); diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java index 78b73f65335..d1befb23694 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java @@ -10,7 +10,6 @@ import io.opentelemetry.api.logs.Severity; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; -import java.util.List; /** * Implementation of {@link LogRecordProcessor} that filters log records based on minimum severity @@ -23,10 +22,9 @@ public final class SeverityBasedLogRecordProcessor implements LogRecordProcessor private final Severity minimumSeverity; private final LogRecordProcessor delegate; - SeverityBasedLogRecordProcessor(Severity minimumSeverity, List processors) { + SeverityBasedLogRecordProcessor(Severity minimumSeverity, LogRecordProcessor delegate) { this.minimumSeverity = requireNonNull(minimumSeverity, "minimumSeverity"); - requireNonNull(processors, "processors"); - this.delegate = LogRecordProcessor.composite(processors); + this.delegate = requireNonNull(delegate, "delegate"); } /** @@ -34,10 +32,12 @@ public final class SeverityBasedLogRecordProcessor implements LogRecordProcessor * SeverityBasedLogRecordProcessor}. * * @param minimumSeverity the minimum severity level required for processing + * @param delegate the processor to delegate to * @return a new {@link SeverityBasedLogRecordProcessorBuilder} */ - public static SeverityBasedLogRecordProcessorBuilder builder(Severity minimumSeverity) { - return new SeverityBasedLogRecordProcessorBuilder(minimumSeverity); + public static SeverityBasedLogRecordProcessorBuilder builder( + Severity minimumSeverity, LogRecordProcessor delegate) { + return new SeverityBasedLogRecordProcessorBuilder(minimumSeverity, delegate); } @Override diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java index 622450592ba..9e99eedd14e 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java @@ -8,59 +8,24 @@ import static java.util.Objects.requireNonNull; import io.opentelemetry.api.logs.Severity; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; /** Builder class for {@link SeverityBasedLogRecordProcessor}. */ public final class SeverityBasedLogRecordProcessorBuilder { private final Severity minimumSeverity; - private final List processors = new ArrayList<>(); + private final LogRecordProcessor delegate; - SeverityBasedLogRecordProcessorBuilder(Severity minimumSeverity) { + SeverityBasedLogRecordProcessorBuilder(Severity minimumSeverity, LogRecordProcessor delegate) { this.minimumSeverity = requireNonNull(minimumSeverity, "minimumSeverity"); - } - - /** - * Adds multiple {@link LogRecordProcessor}s to the list of downstream processors. - * - * @param processors the processors to add - * @return this builder - */ - public SeverityBasedLogRecordProcessorBuilder addProcessors(LogRecordProcessor... processors) { - requireNonNull(processors, "processors"); - addProcessors(Arrays.asList(processors)); - return this; - } - - /** - * Adds multiple {@link LogRecordProcessor}s to the list of downstream processors. - * - * @param processors the processors to add - * @return this builder - */ - public SeverityBasedLogRecordProcessorBuilder addProcessors( - Iterable processors) { - - requireNonNull(processors, "processors"); - for (LogRecordProcessor processor : processors) { - requireNonNull(processor, "processor"); - this.processors.add(processor); - } - return this; + this.delegate = requireNonNull(delegate, "delegate"); } /** * Returns a new {@link SeverityBasedLogRecordProcessor} with the configuration of this builder. * * @return a new {@link SeverityBasedLogRecordProcessor} - * @throws IllegalArgumentException if no processors have been added */ public SeverityBasedLogRecordProcessor build() { - if (processors.isEmpty()) { - throw new IllegalArgumentException("At least one processor must be added"); - } - return new SeverityBasedLogRecordProcessor(minimumSeverity, processors); + return new SeverityBasedLogRecordProcessor(minimumSeverity, delegate); } } diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java index c039c1b0e6e..c7130f1a568 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java @@ -9,31 +9,29 @@ import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; -import java.util.List; /** - * A {@link LogRecordProcessor} that filters out log records associated with - * sampled out spans. - * - * Log records not tied to any span (invalid span context) are not sampled out. + * A {@link LogRecordProcessor} that filters out log records associated with sampled out spans. + * + *

Log records not tied to any span (invalid span context) are not sampled out. */ public final class TraceBasedLogRecordProcessor implements LogRecordProcessor { private final LogRecordProcessor delegate; - TraceBasedLogRecordProcessor(List processors) { - requireNonNull(processors, "processors"); - this.delegate = LogRecordProcessor.composite(processors); + TraceBasedLogRecordProcessor(LogRecordProcessor delegate) { + this.delegate = requireNonNull(delegate, "delegate"); } /** * Returns a new {@link TraceBasedLogRecordProcessorBuilder} to construct a {@link * TraceBasedLogRecordProcessor}. * + * @param delegate the processor to delegate to * @return a new {@link TraceBasedLogRecordProcessorBuilder} */ - public static TraceBasedLogRecordProcessorBuilder builder() { - return new TraceBasedLogRecordProcessorBuilder(); + public static TraceBasedLogRecordProcessorBuilder builder(LogRecordProcessor delegate) { + return new TraceBasedLogRecordProcessorBuilder(delegate); } @Override diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java index 92c099ce0f7..a8d9f505692 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java @@ -7,56 +7,21 @@ import static java.util.Objects.requireNonNull; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - /** Builder class for {@link TraceBasedLogRecordProcessor}. */ public final class TraceBasedLogRecordProcessorBuilder { - private final List processors = new ArrayList<>(); - - TraceBasedLogRecordProcessorBuilder() {} - - /** - * Adds multiple {@link LogRecordProcessor}s to the list of downstream processors. - * - * @param processors the processors to add - * @return this builder - */ - public TraceBasedLogRecordProcessorBuilder addProcessors(LogRecordProcessor... processors) { - requireNonNull(processors, "processors"); - addProcessors(Arrays.asList(processors)); - return this; - } - - /** - * Adds multiple {@link LogRecordProcessor}s to the list of downstream processors. - * - * @param processors the processors to add - * @return this builder - */ - public TraceBasedLogRecordProcessorBuilder addProcessors( - Iterable processors) { + private final LogRecordProcessor delegate; - requireNonNull(processors, "processors"); - for (LogRecordProcessor processor : processors) { - requireNonNull(processor, "processor"); - this.processors.add(processor); - } - return this; + TraceBasedLogRecordProcessorBuilder(LogRecordProcessor delegate) { + this.delegate = requireNonNull(delegate, "delegate"); } /** * Returns a new {@link TraceBasedLogRecordProcessor} with the configuration of this builder. * * @return a new {@link TraceBasedLogRecordProcessor} - * @throws IllegalArgumentException if no processors have been added */ public TraceBasedLogRecordProcessor build() { - if (processors.isEmpty()) { - throw new IllegalArgumentException("At least one processor must be added"); - } - return new TraceBasedLogRecordProcessor(processors); + return new TraceBasedLogRecordProcessor(delegate); } } diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java index 263d32c2cc6..d00b3f295b2 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java @@ -28,8 +28,7 @@ @MockitoSettings(strictness = Strictness.LENIENT) class SeverityBasedLogRecordProcessorTest { - @Mock private LogRecordProcessor processor1; - @Mock private LogRecordProcessor processor2; + @Mock private LogRecordProcessor delegate; @Mock private ReadWriteLogRecord logRecord; private Context context; @@ -37,84 +36,46 @@ class SeverityBasedLogRecordProcessorTest { @BeforeEach void setUp() { context = Context.current(); - when(processor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); - when(processor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); - when(processor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); - when(processor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); } @Test void builder_RequiresMinimumSeverity() { - assertThatThrownBy(() -> SeverityBasedLogRecordProcessor.builder(null)) + assertThatThrownBy(() -> SeverityBasedLogRecordProcessor.builder(null, delegate)) .isInstanceOf(NullPointerException.class) .hasMessageContaining("minimumSeverity"); } @Test - void builder_RequiresAtLeastOneProcessor() { - assertThatThrownBy(() -> SeverityBasedLogRecordProcessor.builder(Severity.INFO).build()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("At least one processor must be added"); - } - - @Test - void builder_NullProcessor() { - assertThatThrownBy( - () -> - SeverityBasedLogRecordProcessor.builder(Severity.INFO) - .addProcessors((LogRecordProcessor) null)) - .isInstanceOf(NullPointerException.class) - .hasMessageContaining("processor"); - } - - @Test - void builder_NullProcessorArray() { - assertThatThrownBy( - () -> - SeverityBasedLogRecordProcessor.builder(Severity.INFO) - .addProcessors((LogRecordProcessor[]) null)) - .isInstanceOf(NullPointerException.class) - .hasMessageContaining("processors"); - } - - @Test - void builder_NullProcessorIterable() { - assertThatThrownBy( - () -> - SeverityBasedLogRecordProcessor.builder(Severity.INFO) - .addProcessors((Iterable) null)) + void builder_RequiresProcessor() { + assertThatThrownBy(() -> SeverityBasedLogRecordProcessor.builder(Severity.INFO, null)) .isInstanceOf(NullPointerException.class) - .hasMessageContaining("processors"); + .hasMessageContaining("delegate"); } @Test - void onEmit_SeverityMeetsMinimum_DelegatesToAllProcessors() { + void onEmit_SeverityMeetsMinimum_DelegatesToProcessor() { when(logRecord.getSeverity()).thenReturn(Severity.WARN); SeverityBasedLogRecordProcessor processor = - SeverityBasedLogRecordProcessor.builder(Severity.WARN) - .addProcessors(processor1, processor2) - .build(); + SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build(); processor.onEmit(context, logRecord); - verify(processor1).onEmit(same(context), same(logRecord)); - verify(processor2).onEmit(same(context), same(logRecord)); + verify(delegate).onEmit(same(context), same(logRecord)); } @Test - void onEmit_SeverityAboveMinimum_DelegatesToAllProcessors() { + void onEmit_SeverityAboveMinimum_DelegatesToProcessor() { when(logRecord.getSeverity()).thenReturn(Severity.ERROR); SeverityBasedLogRecordProcessor processor = - SeverityBasedLogRecordProcessor.builder(Severity.WARN) - .addProcessors(processor1, processor2) - .build(); + SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build(); processor.onEmit(context, logRecord); - verify(processor1).onEmit(same(context), same(logRecord)); - verify(processor2).onEmit(same(context), same(logRecord)); + verify(delegate).onEmit(same(context), same(logRecord)); } @Test @@ -122,14 +83,11 @@ void onEmit_SeverityBelowMinimum_DoesNotDelegate() { when(logRecord.getSeverity()).thenReturn(Severity.DEBUG); SeverityBasedLogRecordProcessor processor = - SeverityBasedLogRecordProcessor.builder(Severity.WARN) - .addProcessors(processor1, processor2) - .build(); + SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build(); processor.onEmit(context, logRecord); - verify(processor1, never()).onEmit(any(), any()); - verify(processor2, never()).onEmit(any(), any()); + verify(delegate, never()).onEmit(any(), any()); } @Test @@ -137,17 +95,17 @@ void onEmit_UndefinedSeverity_DoesNotDelegate() { when(logRecord.getSeverity()).thenReturn(Severity.UNDEFINED_SEVERITY_NUMBER); SeverityBasedLogRecordProcessor processor = - SeverityBasedLogRecordProcessor.builder(Severity.INFO).addProcessors(processor1).build(); + SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build(); processor.onEmit(context, logRecord); - verify(processor1, never()).onEmit(any(), any()); + verify(delegate, never()).onEmit(any(), any()); } @Test void onEmit_VariousSeverityLevels() { SeverityBasedLogRecordProcessor processor = - SeverityBasedLogRecordProcessor.builder(Severity.WARN).addProcessors(processor1).build(); + SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build(); // Test all severity levels testSeverityLevel(processor, Severity.UNDEFINED_SEVERITY_NUMBER, /* shouldDelegate= */ false); @@ -184,49 +142,43 @@ private void testSeverityLevel( processor.onEmit(context, logRecord); if (shouldDelegate) { - verify(processor1).onEmit(same(context), same(logRecord)); + verify(delegate).onEmit(same(context), same(logRecord)); } else { - verify(processor1, never()).onEmit(same(context), same(logRecord)); + verify(delegate, never()).onEmit(same(context), same(logRecord)); } // Reset mock for next test - org.mockito.Mockito.reset(processor1); - when(processor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); - when(processor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + org.mockito.Mockito.reset(delegate); + when(delegate.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); } @Test - void shutdown_DelegatesToAllProcessors() { + void shutdown_DelegatesToProcessor() { SeverityBasedLogRecordProcessor processor = - SeverityBasedLogRecordProcessor.builder(Severity.INFO) - .addProcessors(processor1, processor2) - .build(); + SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build(); CompletableResultCode result = processor.shutdown(); - verify(processor1).shutdown(); - verify(processor2).shutdown(); + verify(delegate).shutdown(); assertThat(result.isSuccess()).isTrue(); } @Test - void forceFlush_DelegatesToAllProcessors() { + void forceFlush_DelegatesToProcessor() { SeverityBasedLogRecordProcessor processor = - SeverityBasedLogRecordProcessor.builder(Severity.INFO) - .addProcessors(processor1, processor2) - .build(); + SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build(); CompletableResultCode result = processor.forceFlush(); - verify(processor1).forceFlush(); - verify(processor2).forceFlush(); + verify(delegate).forceFlush(); assertThat(result.isSuccess()).isTrue(); } @Test void toString_Valid() { SeverityBasedLogRecordProcessor processor = - SeverityBasedLogRecordProcessor.builder(Severity.WARN).addProcessors(processor1).build(); + SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build(); String toString = processor.toString(); assertThat(toString).contains("SeverityBasedLogRecordProcessor"); @@ -236,35 +188,27 @@ void toString_Valid() { @Test void shutdown_ProcessorFailure() { - when(processor1.shutdown()).thenReturn(CompletableResultCode.ofFailure()); - when(processor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.shutdown()).thenReturn(CompletableResultCode.ofFailure()); SeverityBasedLogRecordProcessor processor = - SeverityBasedLogRecordProcessor.builder(Severity.INFO) - .addProcessors(processor1, processor2) - .build(); + SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build(); CompletableResultCode result = processor.shutdown(); - verify(processor1).shutdown(); - verify(processor2).shutdown(); + verify(delegate).shutdown(); assertThat(result.isSuccess()).isFalse(); } @Test void forceFlush_ProcessorFailure() { - when(processor1.forceFlush()).thenReturn(CompletableResultCode.ofFailure()); - when(processor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofFailure()); SeverityBasedLogRecordProcessor processor = - SeverityBasedLogRecordProcessor.builder(Severity.INFO) - .addProcessors(processor1, processor2) - .build(); + SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build(); CompletableResultCode result = processor.forceFlush(); - verify(processor1).forceFlush(); - verify(processor2).forceFlush(); + verify(delegate).forceFlush(); assertThat(result.isSuccess()).isFalse(); } } diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java index 38c8eb1f774..baa46a804ba 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java @@ -32,8 +32,7 @@ @MockitoSettings(strictness = Strictness.LENIENT) class TraceBasedLogRecordProcessorTest { - @Mock private LogRecordProcessor processor1; - @Mock private LogRecordProcessor processor2; + @Mock private LogRecordProcessor delegate; @Mock private ReadWriteLogRecord logRecord; private Context context; @@ -44,10 +43,8 @@ class TraceBasedLogRecordProcessorTest { @BeforeEach void setUp() { context = Context.current(); - when(processor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); - when(processor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); - when(processor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); - when(processor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); // Create sampled span context sampledSpanContext = @@ -70,81 +67,48 @@ void setUp() { } @Test - void builder_RequiresAtLeastOneProcessor() { - assertThatThrownBy(() -> TraceBasedLogRecordProcessor.builder().build()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("At least one processor must be added"); - } - - @Test - void builder_NullProcessor() { - assertThatThrownBy( - () -> TraceBasedLogRecordProcessor.builder().addProcessors((LogRecordProcessor) null)) - .isInstanceOf(NullPointerException.class) - .hasMessageContaining("processor"); - } - - @Test - void builder_NullProcessorArray() { - assertThatThrownBy( - () -> TraceBasedLogRecordProcessor.builder().addProcessors((LogRecordProcessor[]) null)) - .isInstanceOf(NullPointerException.class) - .hasMessageContaining("processors"); - } - - @Test - void builder_NullProcessorIterable() { - assertThatThrownBy( - () -> - TraceBasedLogRecordProcessor.builder() - .addProcessors((Iterable) null)) + void builder_RequiresProcessor() { + assertThatThrownBy(() -> TraceBasedLogRecordProcessor.builder(null)) .isInstanceOf(NullPointerException.class) - .hasMessageContaining("processors"); + .hasMessageContaining("delegate"); } @Test - void onEmit_SampledSpanContext_DelegatesToAllProcessors() { + void onEmit_SampledSpanContext_DelegatesToProcessor() { when(logRecord.getSpanContext()).thenReturn(sampledSpanContext); - TraceBasedLogRecordProcessor processor = - TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); processor.onEmit(context, logRecord); - verify(processor1).onEmit(same(context), same(logRecord)); - verify(processor2).onEmit(same(context), same(logRecord)); + verify(delegate).onEmit(same(context), same(logRecord)); } @Test void onEmit_NotSampledSpanContext_DoesNotDelegate() { when(logRecord.getSpanContext()).thenReturn(notSampledSpanContext); - TraceBasedLogRecordProcessor processor = - TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); processor.onEmit(context, logRecord); - verify(processor1, never()).onEmit(any(), any()); - verify(processor2, never()).onEmit(any(), any()); + verify(delegate, never()).onEmit(any(), any()); } @Test void onEmit_InvalidSpanContext_DelegatesToProcessor() { when(logRecord.getSpanContext()).thenReturn(invalidSpanContext); - TraceBasedLogRecordProcessor processor = - TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); processor.onEmit(context, logRecord); - verify(processor1).onEmit(same(context), same(logRecord)); - verify(processor2).onEmit(same(context), same(logRecord)); + verify(delegate).onEmit(same(context), same(logRecord)); } @Test void onEmit_VariousSpanContexts() { - TraceBasedLogRecordProcessor processor = - TraceBasedLogRecordProcessor.builder().addProcessors(processor1).build(); + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); // Test sampled span context testSpanContext(processor, sampledSpanContext, /* shouldDelegate= */ true); @@ -163,45 +127,40 @@ private void testSpanContext( processor.onEmit(context, logRecord); if (shouldDelegate) { - verify(processor1).onEmit(same(context), same(logRecord)); + verify(delegate).onEmit(same(context), same(logRecord)); } else { - verify(processor1, never()).onEmit(same(context), same(logRecord)); + verify(delegate, never()).onEmit(same(context), same(logRecord)); } // Reset mock for next test - org.mockito.Mockito.reset(processor1); - when(processor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); - when(processor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + org.mockito.Mockito.reset(delegate); + when(delegate.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); } @Test - void shutdown_DelegatesToAllProcessors() { - TraceBasedLogRecordProcessor processor = - TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + void shutdown_DelegatesToProcessor() { + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); CompletableResultCode result = processor.shutdown(); - verify(processor1).shutdown(); - verify(processor2).shutdown(); + verify(delegate).shutdown(); assertThat(result.isSuccess()).isTrue(); } @Test - void forceFlush_DelegatesToAllProcessors() { - TraceBasedLogRecordProcessor processor = - TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + void forceFlush_DelegatesToProcessor() { + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); CompletableResultCode result = processor.forceFlush(); - verify(processor1).forceFlush(); - verify(processor2).forceFlush(); + verify(delegate).forceFlush(); assertThat(result.isSuccess()).isTrue(); } @Test void toString_Valid() { - TraceBasedLogRecordProcessor processor = - TraceBasedLogRecordProcessor.builder().addProcessors(processor1).build(); + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); String toString = processor.toString(); assertThat(toString).contains("TraceBasedLogRecordProcessor"); @@ -210,31 +169,25 @@ void toString_Valid() { @Test void shutdown_ProcessorFailure() { - when(processor1.shutdown()).thenReturn(CompletableResultCode.ofFailure()); - when(processor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.shutdown()).thenReturn(CompletableResultCode.ofFailure()); - TraceBasedLogRecordProcessor processor = - TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); CompletableResultCode result = processor.shutdown(); - verify(processor1).shutdown(); - verify(processor2).shutdown(); + verify(delegate).shutdown(); assertThat(result.isSuccess()).isFalse(); } @Test void forceFlush_ProcessorFailure() { - when(processor1.forceFlush()).thenReturn(CompletableResultCode.ofFailure()); - when(processor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofFailure()); - TraceBasedLogRecordProcessor processor = - TraceBasedLogRecordProcessor.builder().addProcessors(processor1, processor2).build(); + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); CompletableResultCode result = processor.forceFlush(); - verify(processor1).forceFlush(); - verify(processor2).forceFlush(); + verify(delegate).forceFlush(); assertThat(result.isSuccess()).isFalse(); } }