7
7
8
8
import java .util .Objects ;
9
9
import java .util .Queue ;
10
- import java .util .concurrent .ArrayBlockingQueue ;
11
10
import java .util .concurrent .atomic .AtomicBoolean ;
12
11
import java .util .function .Consumer ;
13
12
import java .util .logging .Level ;
14
13
import java .util .logging .Logger ;
15
14
import org .jctools .queues .MessagePassingQueue ;
16
15
import org .jctools .queues .MpscArrayQueue ;
16
+ import org .jctools .queues .atomic .MpscAtomicArrayQueue ;
17
17
18
18
/**
19
19
* Internal accessor of JCTools package for fast queues.
@@ -25,11 +25,15 @@ public final class JcTools {
25
25
26
26
private static final AtomicBoolean queueCreationWarningLogged = new AtomicBoolean ();
27
27
private static final Logger logger = Logger .getLogger (JcTools .class .getName ());
28
+ private static final boolean PROACTIVELY_AVOID_UNSAFE = proactivelyAvoidUnsafe ();
28
29
29
30
/**
30
31
* Returns a new {@link Queue} appropriate for use with multiple producers and a single consumer.
31
32
*/
32
33
public static <T > Queue <T > newFixedSizeQueue (int capacity ) {
34
+ if (PROACTIVELY_AVOID_UNSAFE ) {
35
+ return new MpscAtomicArrayQueue <>(capacity );
36
+ }
33
37
try {
34
38
return new MpscArrayQueue <>(capacity );
35
39
} catch (java .lang .NoClassDefFoundError | java .lang .ExceptionInInitializerError e ) {
@@ -41,7 +45,7 @@ public static <T> Queue<T> newFixedSizeQueue(int capacity) {
41
45
}
42
46
// Happens when modules such as jdk.unsupported are disabled in a custom JRE distribution,
43
47
// or a security manager preventing access to Unsafe is installed.
44
- return new ArrayBlockingQueue <>(capacity );
48
+ return new MpscAtomicArrayQueue <>(capacity );
45
49
}
46
50
}
47
51
@@ -50,11 +54,7 @@ public static <T> Queue<T> newFixedSizeQueue(int capacity) {
50
54
* to use the shaded classes.
51
55
*/
52
56
public static long capacity (Queue <?> queue ) {
53
- if (queue instanceof MessagePassingQueue ) {
54
- return ((MessagePassingQueue <?>) queue ).capacity ();
55
- } else {
56
- return (long ) ((ArrayBlockingQueue <?>) queue ).remainingCapacity () + queue .size ();
57
- }
57
+ return ((MessagePassingQueue <?>) queue ).capacity ();
58
58
}
59
59
60
60
/**
@@ -65,22 +65,26 @@ public static long capacity(Queue<?> queue) {
65
65
*/
66
66
@ SuppressWarnings ("unchecked" )
67
67
public static <T > int drain (Queue <T > queue , int limit , Consumer <T > consumer ) {
68
- if (queue instanceof MessagePassingQueue ) {
69
- return ((MessagePassingQueue <T >) queue ).drain (consumer ::accept , limit );
70
- } else {
71
- return drainNonJcQueue (queue , limit , consumer );
72
- }
68
+ return ((MessagePassingQueue <T >) queue ).drain (consumer ::accept , limit );
73
69
}
74
70
75
- private static <T > int drainNonJcQueue (
76
- Queue <T > queue , int maxExportBatchSize , Consumer <T > consumer ) {
77
- int polledCount = 0 ;
78
- T item ;
79
- while (polledCount < maxExportBatchSize && (item = queue .poll ()) != null ) {
80
- consumer .accept (item );
81
- ++polledCount ;
71
+ private static boolean proactivelyAvoidUnsafe () {
72
+ double javaVersion = getJavaVersion ();
73
+ // Avoid Unsafe on Java 23+ due to JEP-498 deprecation warnings:
74
+ // "WARNING: A terminally deprecated method in sun.misc.Unsafe has been called"
75
+ return javaVersion >= 23 || javaVersion == -1 ;
76
+ }
77
+
78
+ private static double getJavaVersion () {
79
+ String specVersion = System .getProperty ("java.specification.version" );
80
+ if (specVersion != null ) {
81
+ try {
82
+ return Double .parseDouble (specVersion );
83
+ } catch (NumberFormatException exception ) {
84
+ // ignore
85
+ }
82
86
}
83
- return polledCount ;
87
+ return - 1 ;
84
88
}
85
89
86
90
private JcTools () {}
0 commit comments