Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ signature-java = "org.codehaus.mojo.signature:java18:1.0"
tomcat-embed-core = "org.apache.tomcat.embed:tomcat-embed-core:10.1.31"
tomcat-embed-core9 = "org.apache.tomcat.embed:tomcat-embed-core:9.0.89"
truth = "com.google.truth:truth:1.4.4"
undertow-servlet22 = "io.undertow:undertow-servlet:2.2.32.Final"
undertow-servlet22 = "io.undertow:undertow-servlet:2.2.37.Final"
undertow-servlet = "io.undertow:undertow-servlet:2.3.18.Final"

# Do not update: Pinned to the last version supporting Java 8.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,48 @@

package io.grpc.servlet;

import static com.google.common.base.Preconditions.checkState;
import static io.grpc.servlet.ServletServerStream.toHexString;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINEST;

import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.CheckReturnValue;
import io.grpc.InternalLogId;
import io.grpc.servlet.ServletServerStream.ServletTransportState;
import java.io.IOException;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.StampedLock;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
import javax.servlet.ServletOutputStream;

/** Handles write actions from the container thread and the application thread. */
final class AsyncServletOutputStreamWriter {

private final StampedLock writeLock = new StampedLock();

/**
* Memory boundary for write actions.
*
* <pre>
* WriteState curState = writeState.get(); // mark a boundary
* doSomething(); // do something within the boundary
* boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
* if (successful) {
* // state has not changed since
* return;
* } else {
* // state is changed by another thread while doSomething(), need recompute
* }
* </pre>
* The servlet output stream is ready and the writeQueue is empty.
*
* <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
* application thread (calling {@code runOrBuffer()}) that read and update the
* writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
* application thread (calling {@code runOrBuffer()}) that read and update this field.
* Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
* only runOrBuffer() may turn it from true to false.
*
* <p>readyAndDrained turns from false to true when:
* {@code onWritePossible()} exits while currently there is no more data to write, but the last
* check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
*
* <p>readyAndDrained turns from true to false when:
* {@code runOrBuffer()} exits while either the action item is written directly to the
* servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
* right after that returns false, or the action item is buffered into the writeQueue.
*/
private final AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);
// @GuardedBy("writeLock")
private boolean readyAndDrained;

private final Log log;
private final BiFunction<byte[], Integer, ActionItem> writeAction;
Expand All @@ -71,15 +66,10 @@ final class AsyncServletOutputStreamWriter {
private final BooleanSupplier isReady;

/**
* New write actions will be buffered into this queue if the servlet output stream is not ready or
* the queue is not drained.
* New write actions will be buffered into this queue.
*/
// SPSC queue would do
private final Queue<ActionItem> writeChain = new ConcurrentLinkedQueue<>();
// for a theoretical race condition that onWritePossible() is called immediately after isReady()
// returns false and before writeState.compareAndSet()
@Nullable
private volatile Thread parkingThread;
private final Queue<ActionItem> writeQueue = new ConcurrentLinkedQueue<>();

AsyncServletOutputStreamWriter(
AsyncContext asyncContext,
Expand Down Expand Up @@ -128,7 +118,7 @@ public void finest(String str, Object... params) {
log.fine("call completed");
});
};
this.isReady = () -> outputStream.isReady();
this.isReady = outputStream::isReady;
}

/**
Expand Down Expand Up @@ -173,40 +163,21 @@ void complete() {
/** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
void onWritePossible() throws IOException {
log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready");
assureReadyAndDrainedTurnsFalse();
while (isReady.getAsBoolean()) {
WriteState curState = writeState.get();

ActionItem actionItem = writeChain.poll();
if (actionItem != null) {
long stamp = writeLock.writeLock();
try {
while (isReady.getAsBoolean()) {
ActionItem actionItem = writeQueue.poll();
if (actionItem == null) {
readyAndDrained = true;
log.finest("onWritePossible: EXIT. Queue drained");
return;
}
actionItem.run();
continue;
}

if (writeState.compareAndSet(curState, curState.withReadyAndDrained(true))) {
// state has not changed since.
log.finest(
"onWritePossible: EXIT. All data available now is sent out and the servlet output"
+ " stream is still ready");
return;
}
// else, state changed by another thread (runOrBuffer()), need to drain the writeChain
// again
}
log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready");
}

private void assureReadyAndDrainedTurnsFalse() {
// readyAndDrained should have been set to false already.
// Just in case due to a race condition readyAndDrained is still true at this moment and is
// being set to false by runOrBuffer() concurrently.
while (writeState.get().readyAndDrained) {
parkingThread = Thread.currentThread();
// Try to sleep for an extremely long time to avoid writeState being changed at exactly
// the time when sleep time expires (in extreme scenario, such as #9917).
LockSupport.parkNanos(Duration.ofHours(1).toNanos()); // should return immediately
log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready");
} finally {
writeLock.unlockWrite(stamp);
}
parkingThread = null;
}

/**
Expand All @@ -216,31 +187,26 @@ private void assureReadyAndDrainedTurnsFalse() {
* <p>Called from application thread.
*/
private void runOrBuffer(ActionItem actionItem) throws IOException {
WriteState curState = writeState.get();
if (curState.readyAndDrained) { // write to the outputStream directly
actionItem.run();
if (actionItem == completeAction) {
return;
}
if (!isReady.getAsBoolean()) {
boolean successful =
writeState.compareAndSet(curState, curState.withReadyAndDrained(false));
LockSupport.unpark(parkingThread);
checkState(successful, "Bug: curState is unexpectedly changed by another thread");
log.finest("the servlet output stream becomes not ready");
}
} else { // buffer to the writeChain
writeChain.offer(actionItem);
if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) {
checkState(
writeState.get().readyAndDrained,
"Bug: onWritePossible() should have changed readyAndDrained to true, but not");
ActionItem lastItem = writeChain.poll();
if (lastItem != null) {
checkState(lastItem == actionItem, "Bug: lastItem != actionItem");
runOrBuffer(lastItem);
writeQueue.offer(actionItem);
long stamp = writeLock.tryWriteLock();
if (stamp == 0L) {
return;
}
try {
if (readyAndDrained) { // write to the outputStream directly
ActionItem toWrite = writeQueue.poll();
if (toWrite != null) {
toWrite.run();
if (toWrite == completeAction) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the earlier code, while returning from completeAction the unpacking of the locked thread was not done but now we are releasing the lock and always letting the container thread be unlocked and run the code in onWritePossible. Seems ok, just pointing out the difference as you probably know this code better.

}
if (!isReady.getAsBoolean()) {
readyAndDrained = false;
}
}
} // state has not changed since
}
} finally {
writeLock.unlockWrite(stamp);
}
}

Expand All @@ -254,43 +220,11 @@ interface ActionItem {
@VisibleForTesting // Lincheck test can not run with java.util.logging dependency.
interface Log {
default boolean isLoggable(Level level) {
return false;
return false;
}

default void fine(String str, Object...params) {}

default void finest(String str, Object...params) {}
}

private static final class WriteState {

static final WriteState DEFAULT = new WriteState(false);

/**
* The servlet output stream is ready and the writeChain is empty.
*
* <p>readyAndDrained turns from false to true when:
* {@code onWritePossible()} exits while currently there is no more data to write, but the last
* check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
*
* <p>readyAndDrained turns from true to false when:
* {@code runOrBuffer()} exits while either the action item is written directly to the
* servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
* right after that returns false, or the action item is buffered into the writeChain.
*/
final boolean readyAndDrained;

WriteState(boolean readyAndDrained) {
this.readyAndDrained = readyAndDrained;
}

/**
* Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code
* runOrBuffer()} can set it to false.
*/
@CheckReturnValue
WriteState withReadyAndDrained(boolean readyAndDrained) {
return new WriteState(readyAndDrained);
}
}
}