Skip to content

Commit b5163cf

Browse files
committed
PublisherAsBlockingIterable LinkedBlockingQueue -> SpscBlockingQueue
Motivation: LinkedBlockingQueue allows for multiple producers and multiple consumers. It uses LockSupport park in offer and unpark in take. LockSupport unpark on the EventLoop thread has been shown to impact throughput during benchmarks. Before: ``` ``` After: ``` ```
1 parent 450096e commit b5163cf

File tree

1 file changed

+308
-3
lines changed

1 file changed

+308
-3
lines changed

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java

Lines changed: 308 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,23 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30+
import java.util.Collection;
3031
import java.util.Iterator;
3132
import java.util.NoSuchElementException;
33+
import java.util.Queue;
3234
import java.util.concurrent.BlockingQueue;
33-
import java.util.concurrent.LinkedBlockingQueue;
3435
import java.util.concurrent.TimeUnit;
3536
import java.util.concurrent.TimeoutException;
37+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
38+
import java.util.concurrent.locks.LockSupport;
3639
import javax.annotation.Nullable;
3740

3841
import static io.servicetalk.concurrent.api.SubscriberApiUtils.unwrapNullUnchecked;
3942
import static io.servicetalk.concurrent.api.SubscriberApiUtils.wrapNull;
4043
import static io.servicetalk.concurrent.internal.TerminalNotification.complete;
4144
import static io.servicetalk.concurrent.internal.TerminalNotification.error;
42-
import static io.servicetalk.utils.internal.PlatformDependent.throwException;
45+
import static io.servicetalk.utils.internal.PlatformDependent.newUnboundedSpscQueue;
46+
import static io.servicetalk.utils.internal.ThrowableUtils.throwException;
4347
import static java.lang.Math.min;
4448
import static java.lang.Thread.currentThread;
4549
import static java.util.Objects.requireNonNull;
@@ -101,7 +105,7 @@ private static final class SubscriberAndIterator<T> implements Subscriber<T>, Bl
101105

102106
SubscriberAndIterator(int queueCapacity) {
103107
requestN = queueCapacity;
104-
data = new LinkedBlockingQueue<>();
108+
data = new SpscBlockingQueue<>(newUnboundedSpscQueue(queueCapacity));
105109
}
106110

107111
@Override
@@ -261,4 +265,305 @@ private T processNext() {
261265
return unwrapNullUnchecked(signal);
262266
}
263267
}
268+
269+
private static final class SpscBlockingQueue<T> implements BlockingQueue<T> {
270+
/**
271+
* Amount of times to call {@link Thread#yield()} before calling {@link LockSupport#park()}.
272+
* {@link LockSupport#park()} can be expensive and if the producer is generating data it is likely we will see
273+
* it without park/unpark.
274+
*/
275+
private static final int POLL_YIELD_SPIN_COUNT =
276+
Integer.getInteger("io.servicetalk.concurrent.internal.blockingIterableYieldSpinCount", 1);
277+
/**
278+
* Amount of nanoseconds to spin on {@link Thread#yield()} before calling {@link LockSupport#parkNanos(long)}.
279+
* {@link LockSupport#parkNanos(long)} can be expensive and if the producer is generating data it is likely
280+
* we will see it without park/unpark.
281+
*/
282+
private static final long POLL_YIELD_SPIN_NS =
283+
Long.getLong("io.servicetalk.concurrent.internal.blockingIterableYieldSpinNs", 1024);
284+
@SuppressWarnings("rawtypes")
285+
private static final AtomicReferenceFieldUpdater<SpscBlockingQueue, Thread> consumerThreadUpdater =
286+
AtomicReferenceFieldUpdater.newUpdater(SpscBlockingQueue.class, Thread.class, "consumerThread");
287+
private static final Thread PRODUCED_THREAD = new Thread(() -> { });
288+
private final Queue<T> spscQueue;
289+
@Nullable
290+
private volatile Thread consumerThread;
291+
292+
SpscBlockingQueue(Queue<T> spscQueue) {
293+
this.spscQueue = requireNonNull(spscQueue);
294+
}
295+
296+
@Override
297+
public boolean add(final T t) {
298+
if (spscQueue.add(t)) {
299+
signalConsumer();
300+
return true;
301+
}
302+
return false;
303+
}
304+
305+
@Override
306+
public boolean offer(final T t) {
307+
if (spscQueue.offer(t)) {
308+
signalConsumer();
309+
return true;
310+
}
311+
return false;
312+
}
313+
314+
private void signalConsumer() {
315+
final Thread thread = consumerThreadUpdater.getAndSet(this, PRODUCED_THREAD);
316+
if (thread != null && thread != PRODUCED_THREAD) {
317+
LockSupport.unpark(thread);
318+
}
319+
}
320+
321+
@Override
322+
public T remove() {
323+
return spscQueue.remove();
324+
}
325+
326+
@Override
327+
public T poll() {
328+
return spscQueue.poll();
329+
}
330+
331+
@Override
332+
public T element() {
333+
final T t = poll();
334+
if (t == null) {
335+
throw new NoSuchElementException();
336+
}
337+
return t;
338+
}
339+
340+
@Override
341+
public T peek() {
342+
return spscQueue.peek();
343+
}
344+
345+
@Override
346+
public void put(final T t) {
347+
throw new UnsupportedOperationException();
348+
}
349+
350+
@Override
351+
public boolean offer(final T t, final long timeout, final TimeUnit unit) {
352+
throw new UnsupportedOperationException();
353+
}
354+
355+
@Override
356+
public T take() throws InterruptedException {
357+
final Thread currentThread = Thread.currentThread();
358+
for (;;) {
359+
final Thread thread = consumerThread;
360+
if (thread != null && thread != currentThread && thread != PRODUCED_THREAD) {
361+
throwTooManyConsumers(currentThread);
362+
} else if (thread == currentThread ||
363+
consumerThreadUpdater.compareAndSet(this, thread, currentThread)) {
364+
try {
365+
T item;
366+
int pollCount = 0;
367+
while ((item = spscQueue.poll()) == null) {
368+
// Benchmarks show that park/unpark is expensive when producer is the EventLoop thread and
369+
// unpark has to wakeup a thread that is parked. Yield has been shown to lower this cost
370+
// on the EventLoop thread and increase throughput in these scenarios.
371+
if (pollCount++ > POLL_YIELD_SPIN_COUNT) {
372+
LockSupport.park();
373+
} else {
374+
Thread.yield();
375+
}
376+
checkInterrupted();
377+
}
378+
379+
return item;
380+
} finally {
381+
// If this call changed the consumerThread before the poll call we should restore it after.
382+
// This should be done atomically in case another thread has produced concurrently and swapped
383+
// the value to PRODUCED_THREAD.
384+
if (thread != currentThread) {
385+
consumerThreadUpdater.compareAndSet(this, currentThread, null);
386+
}
387+
}
388+
}
389+
}
390+
}
391+
392+
@Override
393+
public T poll(final long timeout, final TimeUnit unit) throws InterruptedException {
394+
final Thread currentThread = Thread.currentThread();
395+
for (;;) {
396+
final Thread thread = consumerThread;
397+
if (thread != null && thread != currentThread && thread != PRODUCED_THREAD) {
398+
throwTooManyConsumers(currentThread);
399+
} else if (thread == currentThread ||
400+
consumerThreadUpdater.compareAndSet(this, thread, currentThread)) {
401+
try {
402+
final long originalNs = unit.toNanos(timeout);
403+
long remainingNs = originalNs;
404+
long beforeTimeNs = System.nanoTime();
405+
T item;
406+
while ((item = spscQueue.poll()) == null) {
407+
// Benchmarks show that park/unpark is expensive when producer is the EventLoop thread and
408+
// unpark has to wakeup a thread that is parked. Yield has been shown to lower this cost
409+
// on the EventLoop thread and increase throughput in these scenarios.
410+
if (originalNs - remainingNs > POLL_YIELD_SPIN_NS) {
411+
LockSupport.parkNanos(remainingNs);
412+
} else {
413+
Thread.yield();
414+
}
415+
checkInterrupted();
416+
final long afterTimeNs = System.nanoTime();
417+
final long durationNs = afterTimeNs - beforeTimeNs;
418+
if (durationNs > remainingNs) {
419+
return null;
420+
}
421+
remainingNs -= durationNs;
422+
beforeTimeNs = afterTimeNs;
423+
}
424+
425+
return item;
426+
} finally {
427+
// If this call changed the consumerThread before the poll call we should restore it after.
428+
// This should be done atomically in case another thread has produced concurrently and swapped
429+
// the value to PRODUCED_THREAD.
430+
if (thread != currentThread) {
431+
consumerThreadUpdater.compareAndSet(this, currentThread, null);
432+
}
433+
}
434+
}
435+
}
436+
}
437+
438+
private static void throwTooManyConsumers(Thread currentThread) {
439+
throw new IllegalStateException("Only single consumer allowed, current consumer: " + currentThread);
440+
}
441+
442+
private static void checkInterrupted() throws InterruptedException {
443+
if (Thread.interrupted()) {
444+
throw new InterruptedException();
445+
}
446+
}
447+
448+
@Override
449+
public int remainingCapacity() {
450+
return Integer.MAX_VALUE;
451+
}
452+
453+
@Override
454+
public boolean remove(final Object o) {
455+
if (spscQueue.remove(o)) {
456+
signalConsumer();
457+
return true;
458+
}
459+
return false;
460+
}
461+
462+
@Override
463+
public boolean containsAll(final Collection<?> c) {
464+
return spscQueue.containsAll(c);
465+
}
466+
467+
@Override
468+
public boolean addAll(final Collection<? extends T> c) {
469+
if (spscQueue.addAll(c)) {
470+
signalConsumer();
471+
return true;
472+
}
473+
return false;
474+
}
475+
476+
@Override
477+
public boolean removeAll(final Collection<?> c) {
478+
if (spscQueue.removeAll(c)) {
479+
signalConsumer();
480+
return true;
481+
}
482+
return false;
483+
}
484+
485+
@Override
486+
public boolean retainAll(final Collection<?> c) {
487+
if (spscQueue.retainAll(c)) {
488+
signalConsumer();
489+
return true;
490+
}
491+
return false;
492+
}
493+
494+
@Override
495+
public void clear() {
496+
spscQueue.clear();
497+
signalConsumer();
498+
}
499+
500+
@Override
501+
public int size() {
502+
return spscQueue.size();
503+
}
504+
505+
@Override
506+
public boolean isEmpty() {
507+
return spscQueue.isEmpty();
508+
}
509+
510+
@Override
511+
public boolean contains(final Object o) {
512+
return spscQueue.contains(o);
513+
}
514+
515+
@Override
516+
public Iterator<T> iterator() {
517+
return spscQueue.iterator();
518+
}
519+
520+
@Override
521+
public Object[] toArray() {
522+
return spscQueue.toArray();
523+
}
524+
525+
@Override
526+
public <T1> T1[] toArray(final T1[] a) {
527+
return spscQueue.toArray(a);
528+
}
529+
530+
@Override
531+
public int drainTo(final Collection<? super T> c) {
532+
int i = 0;
533+
T item;
534+
while ((item = poll()) != null) {
535+
if (c.add(item)) {
536+
++i;
537+
}
538+
}
539+
return i;
540+
}
541+
542+
@Override
543+
public int drainTo(final Collection<? super T> c, final int maxElements) {
544+
int i = 0;
545+
T item;
546+
while (i < maxElements && (item = poll()) != null) {
547+
if (c.add(item)) {
548+
++i;
549+
}
550+
}
551+
return i;
552+
}
553+
554+
@Override
555+
public boolean equals(Object o) {
556+
return o instanceof SpscBlockingQueue && spscQueue.equals(((SpscBlockingQueue<?>) o).spscQueue);
557+
}
558+
559+
@Override
560+
public int hashCode() {
561+
return spscQueue.hashCode();
562+
}
563+
564+
@Override
565+
public String toString() {
566+
return spscQueue.toString();
567+
}
568+
}
264569
}

0 commit comments

Comments
 (0)