Skip to content

Commit a3ba516

Browse files
committed
servlet: simplify synchronization in AsyncServletOutputStreamWriter
1 parent 437e03d commit a3ba516

File tree

2 files changed

+53
-119
lines changed

2 files changed

+53
-119
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ signature-java = "org.codehaus.mojo.signature:java18:1.0"
112112
tomcat-embed-core = "org.apache.tomcat.embed:tomcat-embed-core:10.1.31"
113113
tomcat-embed-core9 = "org.apache.tomcat.embed:tomcat-embed-core:9.0.89"
114114
truth = "com.google.truth:truth:1.4.4"
115-
undertow-servlet22 = "io.undertow:undertow-servlet:2.2.32.Final"
115+
undertow-servlet22 = "io.undertow:undertow-servlet:2.2.37.Final"
116116
undertow-servlet = "io.undertow:undertow-servlet:2.3.18.Final"
117117

118118
# Do not update: Pinned to the last version supporting Java 8.

servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java

Lines changed: 52 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -16,53 +16,48 @@
1616

1717
package io.grpc.servlet;
1818

19-
import static com.google.common.base.Preconditions.checkState;
2019
import static io.grpc.servlet.ServletServerStream.toHexString;
2120
import static java.util.logging.Level.FINE;
2221
import static java.util.logging.Level.FINEST;
2322

2423
import com.google.common.annotations.VisibleForTesting;
25-
import com.google.errorprone.annotations.CheckReturnValue;
2624
import io.grpc.InternalLogId;
2725
import io.grpc.servlet.ServletServerStream.ServletTransportState;
2826
import java.io.IOException;
29-
import java.time.Duration;
3027
import java.util.Queue;
3128
import java.util.concurrent.ConcurrentLinkedQueue;
32-
import java.util.concurrent.atomic.AtomicReference;
33-
import java.util.concurrent.locks.LockSupport;
29+
import java.util.concurrent.locks.StampedLock;
3430
import java.util.function.BiFunction;
3531
import java.util.function.BooleanSupplier;
3632
import java.util.logging.Level;
3733
import java.util.logging.Logger;
38-
import javax.annotation.Nullable;
3934
import javax.servlet.AsyncContext;
4035
import javax.servlet.ServletOutputStream;
4136

4237
/** Handles write actions from the container thread and the application thread. */
4338
final class AsyncServletOutputStreamWriter {
4439

40+
private final StampedLock writeLock = new StampedLock();
41+
4542
/**
46-
* Memory boundary for write actions.
47-
*
48-
* <pre>
49-
* WriteState curState = writeState.get(); // mark a boundary
50-
* doSomething(); // do something within the boundary
51-
* boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
52-
* if (successful) {
53-
* // state has not changed since
54-
* return;
55-
* } else {
56-
* // state is changed by another thread while doSomething(), need recompute
57-
* }
58-
* </pre>
43+
* The servlet output stream is ready and the writeQueue is empty.
5944
*
6045
* <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
61-
* application thread (calling {@code runOrBuffer()}) that read and update the
62-
* writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
46+
* application thread (calling {@code runOrBuffer()}) that read and update this field.
47+
* Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
6348
* only runOrBuffer() may turn it from true to false.
49+
*
50+
* <p>readyAndDrained turns from false to true when:
51+
* {@code onWritePossible()} exits while currently there is no more data to write, but the last
52+
* check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
53+
*
54+
* <p>readyAndDrained turns from true to false when:
55+
* {@code runOrBuffer()} exits while either the action item is written directly to the
56+
* servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
57+
* right after that returns false, or the action item is buffered into the writeQueue.
6458
*/
65-
private final AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);
59+
// @GuardedBy("writeLock")
60+
private boolean readyAndDrained;
6661

6762
private final Log log;
6863
private final BiFunction<byte[], Integer, ActionItem> writeAction;
@@ -71,15 +66,10 @@ final class AsyncServletOutputStreamWriter {
7166
private final BooleanSupplier isReady;
7267

7368
/**
74-
* New write actions will be buffered into this queue if the servlet output stream is not ready or
75-
* the queue is not drained.
69+
* New write actions will be buffered into this queue.
7670
*/
7771
// SPSC queue would do
78-
private final Queue<ActionItem> writeChain = new ConcurrentLinkedQueue<>();
79-
// for a theoretical race condition that onWritePossible() is called immediately after isReady()
80-
// returns false and before writeState.compareAndSet()
81-
@Nullable
82-
private volatile Thread parkingThread;
72+
private final Queue<ActionItem> writeQueue = new ConcurrentLinkedQueue<>();
8373

8474
AsyncServletOutputStreamWriter(
8575
AsyncContext asyncContext,
@@ -128,7 +118,7 @@ public void finest(String str, Object... params) {
128118
log.fine("call completed");
129119
});
130120
};
131-
this.isReady = () -> outputStream.isReady();
121+
this.isReady = outputStream::isReady;
132122
}
133123

134124
/**
@@ -173,40 +163,21 @@ void complete() {
173163
/** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
174164
void onWritePossible() throws IOException {
175165
log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready");
176-
assureReadyAndDrainedTurnsFalse();
177-
while (isReady.getAsBoolean()) {
178-
WriteState curState = writeState.get();
179-
180-
ActionItem actionItem = writeChain.poll();
181-
if (actionItem != null) {
166+
long stamp = writeLock.writeLock();
167+
try {
168+
while (isReady.getAsBoolean()) {
169+
ActionItem actionItem = writeQueue.poll();
170+
if (actionItem == null) {
171+
readyAndDrained = true;
172+
log.finest("onWritePossible: EXIT. Queue drained");
173+
return;
174+
}
182175
actionItem.run();
183-
continue;
184176
}
185-
186-
if (writeState.compareAndSet(curState, curState.withReadyAndDrained(true))) {
187-
// state has not changed since.
188-
log.finest(
189-
"onWritePossible: EXIT. All data available now is sent out and the servlet output"
190-
+ " stream is still ready");
191-
return;
192-
}
193-
// else, state changed by another thread (runOrBuffer()), need to drain the writeChain
194-
// again
195-
}
196-
log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready");
197-
}
198-
199-
private void assureReadyAndDrainedTurnsFalse() {
200-
// readyAndDrained should have been set to false already.
201-
// Just in case due to a race condition readyAndDrained is still true at this moment and is
202-
// being set to false by runOrBuffer() concurrently.
203-
while (writeState.get().readyAndDrained) {
204-
parkingThread = Thread.currentThread();
205-
// Try to sleep for an extremely long time to avoid writeState being changed at exactly
206-
// the time when sleep time expires (in extreme scenario, such as #9917).
207-
LockSupport.parkNanos(Duration.ofHours(1).toNanos()); // should return immediately
177+
log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready");
178+
} finally {
179+
writeLock.unlockWrite(stamp);
208180
}
209-
parkingThread = null;
210181
}
211182

212183
/**
@@ -216,31 +187,26 @@ private void assureReadyAndDrainedTurnsFalse() {
216187
* <p>Called from application thread.
217188
*/
218189
private void runOrBuffer(ActionItem actionItem) throws IOException {
219-
WriteState curState = writeState.get();
220-
if (curState.readyAndDrained) { // write to the outputStream directly
221-
actionItem.run();
222-
if (actionItem == completeAction) {
223-
return;
224-
}
225-
if (!isReady.getAsBoolean()) {
226-
boolean successful =
227-
writeState.compareAndSet(curState, curState.withReadyAndDrained(false));
228-
LockSupport.unpark(parkingThread);
229-
checkState(successful, "Bug: curState is unexpectedly changed by another thread");
230-
log.finest("the servlet output stream becomes not ready");
231-
}
232-
} else { // buffer to the writeChain
233-
writeChain.offer(actionItem);
234-
if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) {
235-
checkState(
236-
writeState.get().readyAndDrained,
237-
"Bug: onWritePossible() should have changed readyAndDrained to true, but not");
238-
ActionItem lastItem = writeChain.poll();
239-
if (lastItem != null) {
240-
checkState(lastItem == actionItem, "Bug: lastItem != actionItem");
241-
runOrBuffer(lastItem);
190+
writeQueue.offer(actionItem);
191+
long stamp = writeLock.tryWriteLock();
192+
if (stamp == 0L) {
193+
return;
194+
}
195+
try {
196+
if (readyAndDrained) { // write to the outputStream directly
197+
ActionItem toWrite = writeQueue.poll();
198+
if (toWrite != null) {
199+
toWrite.run();
200+
if (toWrite == completeAction) {
201+
return;
202+
}
203+
if (!isReady.getAsBoolean()) {
204+
readyAndDrained = false;
205+
}
242206
}
243-
} // state has not changed since
207+
}
208+
} finally {
209+
writeLock.unlockWrite(stamp);
244210
}
245211
}
246212

@@ -254,43 +220,11 @@ interface ActionItem {
254220
@VisibleForTesting // Lincheck test can not run with java.util.logging dependency.
255221
interface Log {
256222
default boolean isLoggable(Level level) {
257-
return false;
223+
return false;
258224
}
259225

260226
default void fine(String str, Object...params) {}
261227

262228
default void finest(String str, Object...params) {}
263229
}
264-
265-
private static final class WriteState {
266-
267-
static final WriteState DEFAULT = new WriteState(false);
268-
269-
/**
270-
* The servlet output stream is ready and the writeChain is empty.
271-
*
272-
* <p>readyAndDrained turns from false to true when:
273-
* {@code onWritePossible()} exits while currently there is no more data to write, but the last
274-
* check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
275-
*
276-
* <p>readyAndDrained turns from true to false when:
277-
* {@code runOrBuffer()} exits while either the action item is written directly to the
278-
* servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
279-
* right after that returns false, or the action item is buffered into the writeChain.
280-
*/
281-
final boolean readyAndDrained;
282-
283-
WriteState(boolean readyAndDrained) {
284-
this.readyAndDrained = readyAndDrained;
285-
}
286-
287-
/**
288-
* Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code
289-
* runOrBuffer()} can set it to false.
290-
*/
291-
@CheckReturnValue
292-
WriteState withReadyAndDrained(boolean readyAndDrained) {
293-
return new WriteState(readyAndDrained);
294-
}
295-
}
296230
}

0 commit comments

Comments
 (0)