From dfd8a963eca20938936ef425ee0802a73d9552f7 Mon Sep 17 00:00:00 2001 From: michael stack Date: Wed, 1 Oct 2025 21:31:02 -0700 Subject: [PATCH 1/2] Rebase --- fdbserver/RESTSimKmsVault.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/RESTSimKmsVault.actor.cpp b/fdbserver/RESTSimKmsVault.actor.cpp index 8c2e13e4dc0..d6023f5c64d 100644 --- a/fdbserver/RESTSimKmsVault.actor.cpp +++ b/fdbserver/RESTSimKmsVault.actor.cpp @@ -836,4 +836,4 @@ TEST_CASE("/restSimKmsVault/GetBlobMetadata/foo") { VaultResponse response = handleFetchBlobMetada(requestContent); validateBlobLookup(response, domIds); return Void(); -} \ No newline at end of file +} From 1cc86409ac5ea1a20f473be96d8a79754cc4f277 Mon Sep 17 00:00:00 2001 From: michael stack Date: Wed, 1 Oct 2025 08:51:43 -0700 Subject: [PATCH 2/2] Add process-local s3blobstore connection pool isolation in simulation. Prevent connection sharing and corruption across simulated processes. Add eager close to connections in simulation so less likely resources can be harvested by another. Some specialization around http for simulation case. --- fdbclient/S3BlobStore.actor.cpp | 45 ++++++++++++--- fdbclient/include/fdbclient/S3BlobStore.h | 69 ++++++++++++++++++++--- 2 files changed, 98 insertions(+), 16 deletions(-) diff --git a/fdbclient/S3BlobStore.actor.cpp b/fdbclient/S3BlobStore.actor.cpp index 9afcb097a82..20fbd7c5dd8 100644 --- a/fdbclient/S3BlobStore.actor.cpp +++ b/fdbclient/S3BlobStore.actor.cpp @@ -20,6 +20,8 @@ #include "fdbclient/S3BlobStore.h" +#include +#include "fdbrpc/HTTP.h" #include "fdbclient/ClientKnobs.h" #include "fdbclient/Knobs.h" #include "flow/FastRef.h" @@ -76,9 +78,6 @@ S3BlobStoreEndpoint::Stats S3BlobStoreEndpoint::s_stats; std::unique_ptr S3BlobStoreEndpoint::blobStats; Future S3BlobStoreEndpoint::statsLogger = Never(); -std::unordered_map> - S3BlobStoreEndpoint::globalConnectionPool; - S3BlobStoreEndpoint::BlobKnobs::BlobKnobs() { secure_connection = 1; connect_tries = CLIENT_KNOBS->BLOBSTORE_CONNECT_TRIES; @@ -199,6 +198,11 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const { } std::string guessRegionFromDomain(std::string domain) { + // Special case for localhost/127.0.0.1 to prevent basic_string exception + if (domain == "127.0.0.1" || domain == "localhost") { + return "us-east-1"; + } + static const std::vector knownServices = { "s3.", "cos.", "oss-", "obs." }; boost::algorithm::to_lower(domain); @@ -843,6 +847,10 @@ ACTOR Future connect_impl(Referenceconnect(host, service, isTLS))); } + + // Ensure connection is valid before handshake + ASSERT(conn.isValid()); + wait(conn->connectHandshake()); TraceEvent("S3BlobStoreEndpointNewConnectionSuccess") @@ -1030,6 +1038,12 @@ ACTOR Future> doRequest_impl(Referencedata.headers["Host"] = bstore->host; req->data.headers["Accept"] = "application/xml"; + // In simulation, disable connection pooling for MockS3 to prevent NetSAV use-after-free crashes + // This forces connection closure after each request, preventing race conditions during coordinator shutdown + if (g_network->isSimulated() && bstore->host == "127.0.0.1") { + req->data.headers["Connection"] = "close"; + } + // Avoid to send request with an empty resource. if (resource.empty()) { resource = "/"; @@ -1140,7 +1154,11 @@ ACTOR Future> doRequest_impl(ReferencesendRate, &bstore->s_stats.bytes_sent, bstore->recvRate); Reference _dryrunR = wait(timeoutError(dryrunResponse, requestTimeout)); dryrunR = _dryrunR; - std::string s3Error = parseErrorCodeFromS3(dryrunR->data.content); + // Only parse S3 error code for error responses (4xx/5xx), not successful responses (2xx) + std::string s3Error; + if (dryrunR->code >= 400) { + s3Error = parseErrorCodeFromS3(dryrunR->data.content); + } if (dryrunR->code == badRequestCode && isS3TokenError(s3Error)) { // authentication fails and s3 token error persists, retry with a HEAD dryrun request // to avoid sending duplicate data indefinitly to save network bandwidth @@ -1263,7 +1281,12 @@ ACTOR Future> doRequest_impl(Referencecode); - std::string s3Error = parseErrorCodeFromS3(r->data.content); + // Only parse S3 error code for real error responses (4xx/5xx), not successful responses (2xx) + // Skip parsing for simulated errors where response content is still binary data + std::string s3Error; + if (r->code >= 400 && !simulateS3TokenError) { + s3Error = parseErrorCodeFromS3(r->data.content); + } event.detail("S3ErrorCode", s3Error); if (r->code == badRequestCode) { if (isS3TokenError(s3Error) || simulateS3TokenError) { @@ -1460,7 +1483,8 @@ ACTOR Future listObjectsStream_impl(Reference bstore, if (key == nullptr) { throw http_bad_response(); } - object.name = key->value(); + // URL decode the object name since S3 XML responses contain URL-encoded names + object.name = HTTP::urlDecode(key->value()); xml_node<>* size = n->first_node("Size"); if (size == nullptr) { @@ -2035,8 +2059,11 @@ ACTOR Future readObject_impl(Reference bstore, try { // Copy the output bytes, server could have sent more or less bytes than requested so copy at most length // bytes - memcpy(data, r->data.content.data(), std::min(r->data.contentLen, length)); - return r->data.contentLen; + int bytesToCopy = std::min(r->data.contentLen, length); + memcpy(data, r->data.content.data(), bytesToCopy); + // Return the number of bytes actually copied, not the contentLen + // This ensures AsyncFileEncrypted gets blocks of the correct size (4KB) + return bytesToCopy; } catch (Error& e) { TraceEvent(SevWarn, "S3BlobStoreReadObjectMemcpyError").detail("Error", e.what()); throw io_error(); @@ -2441,4 +2468,4 @@ TEST_CASE("/backup/s3/guess_region") { ASSERT_EQ(e.code(), error_code_backup_invalid_url); } return Void(); -} \ No newline at end of file +} diff --git a/fdbclient/include/fdbclient/S3BlobStore.h b/fdbclient/include/fdbclient/S3BlobStore.h index e17e85dd387..c31ac1bf87f 100644 --- a/fdbclient/include/fdbclient/S3BlobStore.h +++ b/fdbclient/include/fdbclient/S3BlobStore.h @@ -117,13 +117,14 @@ class S3BlobStoreEndpoint : public ReferenceCounted { void maybeStartStatsLogger() { if (!blobStats && CLIENT_KNOBS->BLOBSTORE_ENABLE_LOGGING) { blobStats = std::make_unique(); - specialCounter( - blobStats->cc, "GlobalConnectionPoolCount", [this]() { return this->globalConnectionPool.size(); }); + specialCounter(blobStats->cc, "GlobalConnectionPoolCount", [this]() { + return this->getGlobalConnectionPool().size(); + }); specialCounter(blobStats->cc, "GlobalConnectionPoolSize", [this]() { // FIXME: could track this explicitly via an int variable with extra logic, but this should be small and // infrequent int totalConnections = 0; - for (auto& it : this->globalConnectionPool) { + for (auto& it : this->getGlobalConnectionPool()) { totalConnections += it.second->pool.size(); } return totalConnections; @@ -200,6 +201,48 @@ class S3BlobStoreEndpoint : public ReferenceCounted { struct ReusableConnection { Reference conn; double expirationTime; + // CROSS_PROCESS_FIX: Track which process created this connection + NetworkAddress creatingProcess; + + ReusableConnection() : expirationTime(0) { + if (g_network && g_network->isSimulated()) { + creatingProcess = g_network->getLocalAddress(); + } + } + + ReusableConnection(Reference c, double exp) : conn(c), expirationTime(exp) { + if (g_network && g_network->isSimulated()) { + creatingProcess = g_network->getLocalAddress(); + } + } + + // CROSS_PROCESS_FIX: Copy constructor with cross-process detection + ReusableConnection(const ReusableConnection& other) + : conn(other.conn), expirationTime(other.expirationTime), creatingProcess(other.creatingProcess) { + if (g_network && g_network->isSimulated() && creatingProcess.isValid() && + creatingProcess != g_network->getLocalAddress()) { + // Cross-process copy detected - invalidate the connection to prevent sharing + conn = Reference(); + expirationTime = 0; // Mark as expired + } + } + + // CROSS_PROCESS_FIX: Assignment operator with cross-process detection + ReusableConnection& operator=(const ReusableConnection& other) { + if (this != &other) { + conn = other.conn; + expirationTime = other.expirationTime; + creatingProcess = other.creatingProcess; + + if (g_network && g_network->isSimulated() && creatingProcess.isValid() && + creatingProcess != g_network->getLocalAddress()) { + // Cross-process assignment detected - invalidate the connection to prevent sharing + conn = Reference(); + expirationTime = 0; // Mark as expired + } + } + return *this; + } }; // basically, reference counted queue with option to add other fields @@ -208,7 +251,19 @@ class S3BlobStoreEndpoint : public ReferenceCounted { }; // global connection pool for multiple blobstore endpoints with same connection settings and request destination - static std::unordered_map> globalConnectionPool; + // CROSS_PROCESS_FIX: Make connection pool process-local to prevent cross-process connection sharing + static std::unordered_map>& getGlobalConnectionPool() { + // Use process address as key to separate connection pools per simulated process + static std::map>> + processConnectionPools; + + NetworkAddress currentProcess; + if (g_network && g_network->isSimulated()) { + currentProcess = g_network->getLocalAddress(); + } + + return processConnectionPools[currentProcess]; + } S3BlobStoreEndpoint(std::string const& host, std::string const& service, @@ -241,12 +296,12 @@ class S3BlobStoreEndpoint : public ReferenceCounted { connectionPool = makeReference(); } else { BlobStoreConnectionPoolKey key(host, service, region, knobs.isTLS()); - auto it = globalConnectionPool.find(key); - if (it != globalConnectionPool.end()) { + auto it = getGlobalConnectionPool().find(key); + if (it != getGlobalConnectionPool().end()) { connectionPool = it->second; } else { connectionPool = makeReference(); - globalConnectionPool.insert({ key, connectionPool }); + getGlobalConnectionPool().insert({ key, connectionPool }); } } ASSERT(connectionPool.isValid());