Skip to content

Commit af1983e

Browse files
Fix racy response leak check in gRPC ProtocolCompatibilityTest (#3360)
Motivation: #3354 introduced `ProtocolCompatibilityTest.ResponseLeakValidator`. However, the test thread that asserts `pendingRequests` counter can race with `GrpcUtils.validateResponseAndGetPayload` logic for Trailers-Only response draining that executes inside `afterOnError` callback. Modifications: - Use `CountDownLatch` to ensure all new requests were terminated. Result: Fixes #3356
1 parent 2f68c75 commit af1983e

File tree

1 file changed

+16
-6
lines changed

1 file changed

+16
-6
lines changed

servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/netty/ProtocolCompatibilityTest.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,13 @@
117117
import java.util.Collection;
118118
import java.util.Collections;
119119
import java.util.List;
120+
import java.util.Map;
120121
import java.util.Queue;
121122
import java.util.concurrent.ArrayBlockingQueue;
122123
import java.util.concurrent.BlockingQueue;
124+
import java.util.concurrent.ConcurrentHashMap;
125+
import java.util.concurrent.ConcurrentMap;
126+
import java.util.concurrent.CountDownLatch;
123127
import java.util.concurrent.ExecutionException;
124128
import java.util.concurrent.Future;
125129
import java.util.concurrent.atomic.AtomicInteger;
@@ -214,7 +218,7 @@ public SocketAddress listenAddress() {
214218
private final ResponseLeakValidator responseLeakValidator = new ResponseLeakValidator();
215219

216220
@AfterEach
217-
void finalChecks() {
221+
void finalChecks() throws Exception {
218222
responseLeakValidator.assertNoPendingRequests();
219223
}
220224

@@ -1876,7 +1880,8 @@ private CompatResponse response(final int value) throws Exception {
18761880

18771881
private static final class ResponseLeakValidator implements StreamingHttpClientFilterFactory {
18781882

1879-
private final AtomicInteger pendingRequests = new AtomicInteger();
1883+
private final AtomicInteger requestCounter = new AtomicInteger();
1884+
private final ConcurrentMap<String, CountDownLatch> pendingRequests = new ConcurrentHashMap<>();
18801885

18811886
@Override
18821887
public StreamingHttpClientFilter create(FilterableStreamingHttpClient client) {
@@ -1885,9 +1890,11 @@ public StreamingHttpClientFilter create(FilterableStreamingHttpClient client) {
18851890
protected Single<StreamingHttpResponse> request(StreamingHttpRequester delegate,
18861891
StreamingHttpRequest request) {
18871892
return Single.defer(() -> {
1888-
pendingRequests.incrementAndGet();
1893+
CountDownLatch requestLatch = new CountDownLatch(1);
1894+
pendingRequests.put(requestCounter.incrementAndGet() + ". " + request.requestTarget(),
1895+
requestLatch);
18891896
return delegate.request(request)
1890-
.liftSync(new BeforeFinallyHttpOperator(pendingRequests::decrementAndGet))
1897+
.liftSync(new BeforeFinallyHttpOperator(requestLatch::countDown))
18911898
.shareContextOnSubscribe();
18921899
});
18931900
}
@@ -1899,8 +1906,11 @@ public HttpExecutionStrategy requiredOffloads() {
18991906
return HttpExecutionStrategies.offloadNone();
19001907
}
19011908

1902-
void assertNoPendingRequests() {
1903-
assertThat("Detected pending requests, possible response leak", pendingRequests.get(), is(0));
1909+
void assertNoPendingRequests() throws Exception {
1910+
for (Map.Entry<String, CountDownLatch> entry : pendingRequests.entrySet()) {
1911+
assertThat("Request #" + entry.getKey() + "was not properly drained, possible response leak",
1912+
entry.getValue().await(5, SECONDS), is(true));
1913+
}
19041914
}
19051915
}
19061916
}

0 commit comments

Comments
 (0)