From f620b21e4ad7d05622c3bea71b489fee458480cd Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Sun, 21 Sep 2025 12:41:59 -0700 Subject: [PATCH 1/3] Proactively avoid Unsafe on Java 23+ --- .../LoggerProviderConfigurationTest.java | 5 +-- .../sdk/trace/internal/JcTools.java | 44 ++++++++++--------- .../internal/JcToolsSecurityManagerTest.java | 4 +- .../sdk/trace/internal/JcToolsTest.java | 43 ++---------------- 4 files changed, 32 insertions(+), 64 deletions(-) diff --git a/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/LoggerProviderConfigurationTest.java b/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/LoggerProviderConfigurationTest.java index 7a5473e8dea..9609f8cb4d3 100644 --- a/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/LoggerProviderConfigurationTest.java +++ b/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/LoggerProviderConfigurationTest.java @@ -19,7 +19,6 @@ import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor; import io.opentelemetry.sdk.logs.export.LogRecordExporter; import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor; -import io.opentelemetry.sdk.trace.internal.JcTools; import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; @@ -27,7 +26,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -140,7 +138,8 @@ void configureBatchLogRecordProcessor() { assertThat(worker) .extracting("queue") .isInstanceOfSatisfying( - Queue.class, queue -> assertThat(JcTools.capacity(queue)).isEqualTo(2)); + ArrayBlockingQueue.class, + queue -> assertThat(queue.remainingCapacity()).isEqualTo(2)); assertThat(worker) .extracting("logRecordExporter") .isInstanceOf(SystemOutLogRecordExporter.class); diff --git a/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java b/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java index fa009ebdc0f..69844404190 100644 --- a/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java +++ b/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java @@ -7,13 +7,13 @@ import java.util.Objects; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscArrayQueue; +import org.jctools.queues.atomic.MpscAtomicArrayQueue; /** * Internal accessor of JCTools package for fast queues. @@ -25,11 +25,15 @@ public final class JcTools { private static final AtomicBoolean queueCreationWarningLogged = new AtomicBoolean(); private static final Logger logger = Logger.getLogger(JcTools.class.getName()); + private static final boolean PROACTIVELY_AVOID_UNSAFE = proactivelyAvoidUnsafe(); /** * Returns a new {@link Queue} appropriate for use with multiple producers and a single consumer. */ public static Queue newFixedSizeQueue(int capacity) { + if (PROACTIVELY_AVOID_UNSAFE) { + return new MpscAtomicArrayQueue<>(capacity); + } try { return new MpscArrayQueue<>(capacity); } catch (java.lang.NoClassDefFoundError | java.lang.ExceptionInInitializerError e) { @@ -41,7 +45,7 @@ public static Queue newFixedSizeQueue(int capacity) { } // Happens when modules such as jdk.unsupported are disabled in a custom JRE distribution, // or a security manager preventing access to Unsafe is installed. - return new ArrayBlockingQueue<>(capacity); + return new MpscAtomicArrayQueue<>(capacity); } } @@ -50,11 +54,7 @@ public static Queue newFixedSizeQueue(int capacity) { * to use the shaded classes. */ public static long capacity(Queue queue) { - if (queue instanceof MessagePassingQueue) { - return ((MessagePassingQueue) queue).capacity(); - } else { - return (long) ((ArrayBlockingQueue) queue).remainingCapacity() + queue.size(); - } + return ((MessagePassingQueue) queue).capacity(); } /** @@ -65,22 +65,26 @@ public static long capacity(Queue queue) { */ @SuppressWarnings("unchecked") public static int drain(Queue queue, int limit, Consumer consumer) { - if (queue instanceof MessagePassingQueue) { - return ((MessagePassingQueue) queue).drain(consumer::accept, limit); - } else { - return drainNonJcQueue(queue, limit, consumer); - } + return ((MessagePassingQueue) queue).drain(consumer::accept, limit); } - private static int drainNonJcQueue( - Queue queue, int maxExportBatchSize, Consumer consumer) { - int polledCount = 0; - T item; - while (polledCount < maxExportBatchSize && (item = queue.poll()) != null) { - consumer.accept(item); - ++polledCount; + private static boolean proactivelyAvoidUnsafe() { + double javaVersion = getJavaVersion(); + // Avoid Unsafe on Java 23+ due to JEP-498 deprecation warnings: + // "WARNING: A terminally deprecated method in sun.misc.Unsafe has been called" + return javaVersion >= 23 || javaVersion == -1; + } + + private static double getJavaVersion() { + String specVersion = System.getProperty("java.specification.version"); + if (specVersion != null) { + try { + return Double.parseDouble(specVersion); + } catch (NumberFormatException exception) { + // ignore + } } - return polledCount; + return -1; } private JcTools() {} diff --git a/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsSecurityManagerTest.java b/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsSecurityManagerTest.java index eb2bd9199e3..2aad0ee64e7 100644 --- a/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsSecurityManagerTest.java +++ b/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsSecurityManagerTest.java @@ -11,7 +11,7 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; +import org.jctools.queues.atomic.MpscAtomicArrayQueue; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledOnJre; import org.junit.jupiter.api.condition.JRE; @@ -30,7 +30,7 @@ void newFixedSizeQueue_SunMiscProhibited() { Queue queue = AccessController.doPrivileged( (PrivilegedAction>) () -> JcTools.newFixedSizeQueue(10)); - assertThat(queue).isInstanceOf(ArrayBlockingQueue.class); + assertThat(queue).isInstanceOf(MpscAtomicArrayQueue.class); } finally { System.setSecurityManager(null); } diff --git a/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsTest.java b/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsTest.java index edb0168d2ce..1fad53021da 100644 --- a/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsTest.java +++ b/sdk/trace-shaded-deps/src/test/java/io/opentelemetry/sdk/trace/internal/JcToolsTest.java @@ -9,41 +9,18 @@ import java.util.ArrayList; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import org.jctools.queues.MpscArrayQueue; +import org.jctools.queues.MessagePassingQueue; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; -@ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.LENIENT) class JcToolsTest { ArrayList batch = new ArrayList<>(10); - @Test - void drain_ArrayBlockingQueue() { - // Arrange - batch.add("Test3"); - Queue queue = new ArrayBlockingQueue<>(10); - queue.add("Test1"); - queue.add("Test2"); - - // Act - JcTools.drain(queue, 5, batch::add); - - // Assert - assertThat(batch).hasSize(3); - assertThat(queue).hasSize(0); - } - @Test void drain_MessagePassingQueue() { // Arrange batch.add("Test3"); - Queue queue = new MpscArrayQueue<>(10); + Queue queue = JcTools.newFixedSizeQueue(10); queue.add("Test1"); queue.add("Test2"); @@ -58,7 +35,7 @@ void drain_MessagePassingQueue() { @Test void drain_MaxBatch() { // Arrange - Queue queue = new MpscArrayQueue<>(10); + Queue queue = JcTools.newFixedSizeQueue(10); queue.add("Test1"); queue.add("Test2"); @@ -79,7 +56,7 @@ void newFixedSize_MpscQueue() { Queue objects = JcTools.newFixedSizeQueue(capacity); // Assert - assertThat(objects).isInstanceOf(MpscArrayQueue.class); + assertThat(objects).isInstanceOf(MessagePassingQueue.class); } @Test @@ -94,16 +71,4 @@ void capacity_MpscQueue() { // Assert assertThat(queueSize).isGreaterThan(capacity); } - - @Test - void capacity_ArrayBlockingQueue() { - // Arrange - Queue queue = new ArrayBlockingQueue<>(10); - - // Act - long queueSize = JcTools.capacity(queue); - - // Assert - assertThat(queueSize).isEqualTo(10); - } } From ac4b4bed66d2e55cc0dbb3420733c114da82aede Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Tue, 23 Sep 2025 10:28:23 -0700 Subject: [PATCH 2/3] Optional --- .../io/opentelemetry/sdk/trace/internal/JcTools.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java b/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java index 69844404190..964d46f7608 100644 --- a/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java +++ b/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java @@ -6,6 +6,7 @@ package io.opentelemetry.sdk.trace.internal; import java.util.Objects; +import java.util.Optional; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -69,22 +70,22 @@ public static int drain(Queue queue, int limit, Consumer consumer) { } private static boolean proactivelyAvoidUnsafe() { - double javaVersion = getJavaVersion(); + Optional javaVersion = getJavaVersion(); // Avoid Unsafe on Java 23+ due to JEP-498 deprecation warnings: // "WARNING: A terminally deprecated method in sun.misc.Unsafe has been called" - return javaVersion >= 23 || javaVersion == -1; + return javaVersion.map(version -> version >= 23).orElse(true); } - private static double getJavaVersion() { + private static Optional getJavaVersion() { String specVersion = System.getProperty("java.specification.version"); if (specVersion != null) { try { - return Double.parseDouble(specVersion); + return Optional.of(Double.parseDouble(specVersion)); } catch (NumberFormatException exception) { // ignore } } - return -1; + return Optional.empty(); } private JcTools() {} From 1e86329c4ac00ecad102f6854f21ec9f3d40aac2 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Tue, 23 Sep 2025 15:05:18 -0700 Subject: [PATCH 3/3] Always use safe queue --- .../sdk/trace/internal/JcTools.java | 46 +------------------ 1 file changed, 1 insertion(+), 45 deletions(-) diff --git a/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java b/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java index 964d46f7608..1ea3a6d8778 100644 --- a/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java +++ b/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java @@ -5,15 +5,9 @@ package io.opentelemetry.sdk.trace.internal; -import java.util.Objects; -import java.util.Optional; import java.util.Queue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import java.util.logging.Level; -import java.util.logging.Logger; import org.jctools.queues.MessagePassingQueue; -import org.jctools.queues.MpscArrayQueue; import org.jctools.queues.atomic.MpscAtomicArrayQueue; /** @@ -24,30 +18,11 @@ */ public final class JcTools { - private static final AtomicBoolean queueCreationWarningLogged = new AtomicBoolean(); - private static final Logger logger = Logger.getLogger(JcTools.class.getName()); - private static final boolean PROACTIVELY_AVOID_UNSAFE = proactivelyAvoidUnsafe(); - /** * Returns a new {@link Queue} appropriate for use with multiple producers and a single consumer. */ public static Queue newFixedSizeQueue(int capacity) { - if (PROACTIVELY_AVOID_UNSAFE) { - return new MpscAtomicArrayQueue<>(capacity); - } - try { - return new MpscArrayQueue<>(capacity); - } catch (java.lang.NoClassDefFoundError | java.lang.ExceptionInInitializerError e) { - if (!queueCreationWarningLogged.getAndSet(true)) { - logger.log( - Level.WARNING, - "Cannot create high-performance queue, reverting to ArrayBlockingQueue ({0})", - Objects.toString(e, "unknown cause")); - } - // Happens when modules such as jdk.unsupported are disabled in a custom JRE distribution, - // or a security manager preventing access to Unsafe is installed. - return new MpscAtomicArrayQueue<>(capacity); - } + return new MpscAtomicArrayQueue<>(capacity); } /** @@ -69,24 +44,5 @@ public static int drain(Queue queue, int limit, Consumer consumer) { return ((MessagePassingQueue) queue).drain(consumer::accept, limit); } - private static boolean proactivelyAvoidUnsafe() { - Optional javaVersion = getJavaVersion(); - // Avoid Unsafe on Java 23+ due to JEP-498 deprecation warnings: - // "WARNING: A terminally deprecated method in sun.misc.Unsafe has been called" - return javaVersion.map(version -> version >= 23).orElse(true); - } - - private static Optional getJavaVersion() { - String specVersion = System.getProperty("java.specification.version"); - if (specVersion != null) { - try { - return Optional.of(Double.parseDouble(specVersion)); - } catch (NumberFormatException exception) { - // ignore - } - } - return Optional.empty(); - } - private JcTools() {} }