Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .evergreen/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,9 @@ echo "Running tests with Java ${JAVA_VERSION}"
${MULTI_MONGOS_URI_SYSTEM_PROPERTY} ${API_VERSION} ${GRADLE_EXTRA_VARS} \
${JAVA_SYSPROP_ASYNC_TRANSPORT} ${JAVA_SYSPROP_NETTY_SSL_PROVIDER} \
-Dorg.mongodb.test.fle.on.demand.credential.test.failure.enabled=true \
--stacktrace --info --continue ${TESTS}
--stacktrace --info --continue ${TESTS} | tee -a logs.txt

if grep -q 'LEAK:' logs.txt ; then
echo "Netty Leak detected, please inspect build log"
exit 1
fi
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ tasks.withType<Test> {

useJUnitPlatform()

jvmArgs.add("-Dio.netty.leakDetection.level=paranoid")

// Pass any `org.mongodb.*` system settings
systemProperties =
System.getProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
@Override
public void selectServerAsync(final ServerSelector serverSelector, final OperationContext operationContext,
final SingleResultCallback<ServerTuple> callback) {
isTrue("open", !isClosed());
if (isClosed()) {
callback.onResult(null, new MongoClientException("Cluster was closed during server selection."));
}

Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computeServerSelectionTimeout();
ServerSelectionRequest request = new ServerSelectionRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
import static com.mongodb.assertions.Assertions.assertNull;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.connection.Authenticator.shouldAuthenticate;
import static com.mongodb.internal.connection.CommandHelper.HELLO;
Expand Down Expand Up @@ -355,7 +355,7 @@ private Compressor createCompressor(final MongoCompressor mongoCompressor) {
public void close() {
// All but the first call is a no-op
if (!isClosed.getAndSet(true) && (stream != null)) {
stream.close();
stream.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public ByteBuf asReadOnly() {

@Override
public ByteBuf duplicate() {
return new NettyByteBuf(proxied.duplicate().retain(), isWriting);
return new NettyByteBuf(proxied.retainedDuplicate(), isWriting);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,7 @@ private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
composite.addComponent(next);
iter.remove();
} else {
next.retain();
composite.addComponent(next.readSlice(bytesNeededFromCurrentBuffer));
composite.addComponent(next.readRetainedSlice(bytesNeededFromCurrentBuffer));
}
composite.writerIndex(composite.writerIndex() + bytesNeededFromCurrentBuffer);
bytesNeeded -= bytesNeededFromCurrentBuffer;
Expand Down Expand Up @@ -349,7 +348,11 @@ private boolean hasBytesAvailable(final int numBytes) {
private void handleReadResponse(@Nullable final io.netty.buffer.ByteBuf buffer, @Nullable final Throwable t) {
PendingReader localPendingReader = withLock(lock, () -> {
if (buffer != null) {
pendingInboundBuffers.add(buffer.retain());
if (isClosed) {
pendingException = new MongoSocketException("Received data after the stream was closed.", address);
} else {
pendingInboundBuffers.add(buffer.retain());
}
} else {
pendingException = t;
}
Expand Down Expand Up @@ -378,7 +381,7 @@ public void close() {
for (Iterator<io.netty.buffer.ByteBuf> iterator = pendingInboundBuffers.iterator(); iterator.hasNext();) {
io.netty.buffer.ByteBuf nextByteBuf = iterator.next();
iterator.remove();
nextByteBuf.release();
nextByteBuf.release(nextByteBuf.refCnt());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ void shouldWriteUtf8CString(final boolean useBranch, final BufferProvider buffer
@ParameterizedTest(name = "should get byte buffers as little endian. Parameters: useBranch={0}, bufferProvider={1}")
@MethodSource("bufferProvidersWithBranches")
void shouldGetByteBuffersAsLittleEndian(final boolean useBranch, final BufferProvider bufferProvider) {
List<ByteBuf> byteBuffers = new ArrayList<>();
try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
byte[] v = {1, 0, 0, 0};
if (useBranch) {
Expand All @@ -504,7 +505,11 @@ void shouldGetByteBuffersAsLittleEndian(final boolean useBranch, final BufferPro
} else {
out.writeBytes(v);
}
assertEquals(1, out.getByteBuffers().get(0).getInt());

byteBuffers = out.getByteBuffers();
assertEquals(1, byteBuffers.get(0).getInt());
} finally {
byteBuffers.forEach(ByteBuf::release);
}
}

Expand Down Expand Up @@ -1017,6 +1022,7 @@ void shouldWriteInt32WithinSpanningBuffers(
final int expectedLastBufferPosition,
final BufferProvider bufferProvider) {

List<ByteBuf> buffers = new ArrayList<>();
try (ByteBufferBsonOutput output =
new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Integer.BYTES))) {

Expand All @@ -1028,12 +1034,14 @@ void shouldWriteInt32WithinSpanningBuffers(

//then
//getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
List<ByteBuf> buffers = output.getByteBuffers();
buffers = output.getByteBuffers();
assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
assertBufferContents(expectedBuffers, buffers);

assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
assertEquals(expectedOutputPosition, output.getPosition());
} finally {
buffers.forEach(ByteBuf::release);
}
}

Expand All @@ -1049,6 +1057,7 @@ void shouldWriteInt64WithinSpanningBuffers(
final int expectedLastBufferPosition,
final BufferProvider bufferProvider) {

List<ByteBuf> buffers = new ArrayList<>();
try (ByteBufferBsonOutput output =
new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Long.BYTES))) {

Expand All @@ -1060,12 +1069,14 @@ void shouldWriteInt64WithinSpanningBuffers(

//then
//getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
List<ByteBuf> buffers = output.getByteBuffers();
buffers = output.getByteBuffers();
assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
assertBufferContents(expectedBuffers, buffers);

assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
assertEquals(expectedOutputPosition, output.getPosition());
} finally {
buffers.forEach(ByteBuf::release);
}
}

Expand All @@ -1081,6 +1092,7 @@ void shouldWriteDoubleWithinSpanningBuffers(
final int expectedLastBufferPosition,
final BufferProvider bufferProvider) {

List<ByteBuf> buffers = new ArrayList<>();
try (ByteBufferBsonOutput output =
new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Long.BYTES))) {

Expand All @@ -1092,12 +1104,14 @@ void shouldWriteDoubleWithinSpanningBuffers(

//then
//getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
List<ByteBuf> buffers = output.getByteBuffers();
buffers = output.getByteBuffers();
assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
assertBufferContents(expectedBuffers, buffers);

assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
assertEquals(expectedOutputPosition, output.getPosition());
} finally {
buffers.forEach(ByteBuf::release);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL;
import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL;
import static com.mongodb.ReadPreference.primary;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.TimeoutContext.createTimeoutContext;
import static com.mongodb.reactivestreams.client.internal.MongoOperationPublisher.sinkToCallback;
Expand All @@ -73,6 +74,7 @@ public class OperationExecutorImpl implements OperationExecutor {
@Override
public <T> Mono<T> execute(final AsyncReadOperation<T> operation, final ReadPreference readPreference, final ReadConcern readConcern,
@Nullable final ClientSession session) {
isTrue("open", !mongoClient.getCluster().isClosed());
notNull("operation", operation);
notNull("readPreference", readPreference);
notNull("readConcern", readConcern);
Expand Down Expand Up @@ -109,6 +111,7 @@ public <T> Mono<T> execute(final AsyncReadOperation<T> operation, final ReadPref
@Override
public <T> Mono<T> execute(final AsyncWriteOperation<T> operation, final ReadConcern readConcern,
@Nullable final ClientSession session) {
isTrue("open", !mongoClient.getCluster().isClosed());
notNull("operation", operation);
notNull("readConcern", readConcern);

Expand Down