Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 36 additions & 9 deletions fdbclient/S3BlobStore.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include "fdbclient/S3BlobStore.h"

#include <sstream>
#include "fdbrpc/HTTP.h"
#include "fdbclient/ClientKnobs.h"
#include "fdbclient/Knobs.h"
#include "flow/FastRef.h"
Expand Down Expand Up @@ -76,9 +78,6 @@ S3BlobStoreEndpoint::Stats S3BlobStoreEndpoint::s_stats;
std::unique_ptr<S3BlobStoreEndpoint::BlobStats> S3BlobStoreEndpoint::blobStats;
Future<Void> S3BlobStoreEndpoint::statsLogger = Never();

std::unordered_map<BlobStoreConnectionPoolKey, Reference<S3BlobStoreEndpoint::ConnectionPoolData>>
S3BlobStoreEndpoint::globalConnectionPool;

S3BlobStoreEndpoint::BlobKnobs::BlobKnobs() {
secure_connection = 1;
connect_tries = CLIENT_KNOBS->BLOBSTORE_CONNECT_TRIES;
Expand Down Expand Up @@ -199,6 +198,11 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
}

std::string guessRegionFromDomain(std::string domain) {
// Special case for localhost/127.0.0.1 to prevent basic_string exception
if (domain == "127.0.0.1" || domain == "localhost") {
return "us-east-1";
}

static const std::vector<const char*> knownServices = { "s3.", "cos.", "oss-", "obs." };
boost::algorithm::to_lower(domain);

Expand Down Expand Up @@ -843,6 +847,10 @@ ACTOR Future<S3BlobStoreEndpoint::ReusableConnection> connect_impl(Reference<S3B
} else {
wait(store(conn, INetworkConnections::net()->connect(host, service, isTLS)));
}

// Ensure connection is valid before handshake
ASSERT(conn.isValid());

wait(conn->connectHandshake());

TraceEvent("S3BlobStoreEndpointNewConnectionSuccess")
Expand Down Expand Up @@ -1030,6 +1038,12 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
req->data.headers["Host"] = bstore->host;
req->data.headers["Accept"] = "application/xml";

// In simulation, disable connection pooling for MockS3 to prevent NetSAV use-after-free crashes
// This forces connection closure after each request, preventing race conditions during coordinator shutdown
if (g_network->isSimulated() && bstore->host == "127.0.0.1") {
req->data.headers["Connection"] = "close";
}

// Avoid to send request with an empty resource.
if (resource.empty()) {
resource = "/";
Expand Down Expand Up @@ -1140,7 +1154,11 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
rconn.conn, dryrunRequest, bstore->sendRate, &bstore->s_stats.bytes_sent, bstore->recvRate);
Reference<HTTP::IncomingResponse> _dryrunR = wait(timeoutError(dryrunResponse, requestTimeout));
dryrunR = _dryrunR;
std::string s3Error = parseErrorCodeFromS3(dryrunR->data.content);
// Only parse S3 error code for error responses (4xx/5xx), not successful responses (2xx)
std::string s3Error;
if (dryrunR->code >= 400) {
s3Error = parseErrorCodeFromS3(dryrunR->data.content);
}
if (dryrunR->code == badRequestCode && isS3TokenError(s3Error)) {
// authentication fails and s3 token error persists, retry with a HEAD dryrun request
// to avoid sending duplicate data indefinitly to save network bandwidth
Expand Down Expand Up @@ -1263,7 +1281,12 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS

if (!err.present()) {
event.detail("ResponseCode", r->code);
std::string s3Error = parseErrorCodeFromS3(r->data.content);
// Only parse S3 error code for real error responses (4xx/5xx), not successful responses (2xx)
// Skip parsing for simulated errors where response content is still binary data
std::string s3Error;
if (r->code >= 400 && !simulateS3TokenError) {
s3Error = parseErrorCodeFromS3(r->data.content);
}
event.detail("S3ErrorCode", s3Error);
if (r->code == badRequestCode) {
if (isS3TokenError(s3Error) || simulateS3TokenError) {
Expand Down Expand Up @@ -1460,7 +1483,8 @@ ACTOR Future<Void> listObjectsStream_impl(Reference<S3BlobStoreEndpoint> bstore,
if (key == nullptr) {
throw http_bad_response();
}
object.name = key->value();
// URL decode the object name since S3 XML responses contain URL-encoded names
object.name = HTTP::urlDecode(key->value());

xml_node<>* size = n->first_node("Size");
if (size == nullptr) {
Expand Down Expand Up @@ -2035,8 +2059,11 @@ ACTOR Future<int> readObject_impl(Reference<S3BlobStoreEndpoint> bstore,
try {
// Copy the output bytes, server could have sent more or less bytes than requested so copy at most length
// bytes
memcpy(data, r->data.content.data(), std::min<int64_t>(r->data.contentLen, length));
return r->data.contentLen;
int bytesToCopy = std::min<int64_t>(r->data.contentLen, length);
memcpy(data, r->data.content.data(), bytesToCopy);
// Return the number of bytes actually copied, not the contentLen
// This ensures AsyncFileEncrypted gets blocks of the correct size (4KB)
return bytesToCopy;
} catch (Error& e) {
TraceEvent(SevWarn, "S3BlobStoreReadObjectMemcpyError").detail("Error", e.what());
throw io_error();
Expand Down Expand Up @@ -2441,4 +2468,4 @@ TEST_CASE("/backup/s3/guess_region") {
ASSERT_EQ(e.code(), error_code_backup_invalid_url);
}
return Void();
}
}
69 changes: 62 additions & 7 deletions fdbclient/include/fdbclient/S3BlobStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,14 @@ class S3BlobStoreEndpoint : public ReferenceCounted<S3BlobStoreEndpoint> {
void maybeStartStatsLogger() {
if (!blobStats && CLIENT_KNOBS->BLOBSTORE_ENABLE_LOGGING) {
blobStats = std::make_unique<BlobStats>();
specialCounter(
blobStats->cc, "GlobalConnectionPoolCount", [this]() { return this->globalConnectionPool.size(); });
specialCounter(blobStats->cc, "GlobalConnectionPoolCount", [this]() {
return this->getGlobalConnectionPool().size();
});
specialCounter(blobStats->cc, "GlobalConnectionPoolSize", [this]() {
// FIXME: could track this explicitly via an int variable with extra logic, but this should be small and
// infrequent
int totalConnections = 0;
for (auto& it : this->globalConnectionPool) {
for (auto& it : this->getGlobalConnectionPool()) {
totalConnections += it.second->pool.size();
}
return totalConnections;
Expand Down Expand Up @@ -200,6 +201,48 @@ class S3BlobStoreEndpoint : public ReferenceCounted<S3BlobStoreEndpoint> {
struct ReusableConnection {
Reference<IConnection> conn;
double expirationTime;
// CROSS_PROCESS_FIX: Track which process created this connection
NetworkAddress creatingProcess;

ReusableConnection() : expirationTime(0) {
if (g_network && g_network->isSimulated()) {
creatingProcess = g_network->getLocalAddress();
}
}

ReusableConnection(Reference<IConnection> c, double exp) : conn(c), expirationTime(exp) {
if (g_network && g_network->isSimulated()) {
creatingProcess = g_network->getLocalAddress();
}
}

// CROSS_PROCESS_FIX: Copy constructor with cross-process detection
ReusableConnection(const ReusableConnection& other)
: conn(other.conn), expirationTime(other.expirationTime), creatingProcess(other.creatingProcess) {
if (g_network && g_network->isSimulated() && creatingProcess.isValid() &&
creatingProcess != g_network->getLocalAddress()) {
// Cross-process copy detected - invalidate the connection to prevent sharing
conn = Reference<IConnection>();
expirationTime = 0; // Mark as expired
}
}

// CROSS_PROCESS_FIX: Assignment operator with cross-process detection
ReusableConnection& operator=(const ReusableConnection& other) {
if (this != &other) {
conn = other.conn;
expirationTime = other.expirationTime;
creatingProcess = other.creatingProcess;

if (g_network && g_network->isSimulated() && creatingProcess.isValid() &&
creatingProcess != g_network->getLocalAddress()) {
// Cross-process assignment detected - invalidate the connection to prevent sharing
conn = Reference<IConnection>();
expirationTime = 0; // Mark as expired
}
}
return *this;
}
};

// basically, reference counted queue with option to add other fields
Expand All @@ -208,7 +251,19 @@ class S3BlobStoreEndpoint : public ReferenceCounted<S3BlobStoreEndpoint> {
};

// global connection pool for multiple blobstore endpoints with same connection settings and request destination
static std::unordered_map<BlobStoreConnectionPoolKey, Reference<ConnectionPoolData>> globalConnectionPool;
// CROSS_PROCESS_FIX: Make connection pool process-local to prevent cross-process connection sharing
static std::unordered_map<BlobStoreConnectionPoolKey, Reference<ConnectionPoolData>>& getGlobalConnectionPool() {
// Use process address as key to separate connection pools per simulated process
static std::map<NetworkAddress, std::unordered_map<BlobStoreConnectionPoolKey, Reference<ConnectionPoolData>>>
processConnectionPools;

NetworkAddress currentProcess;
if (g_network && g_network->isSimulated()) {
currentProcess = g_network->getLocalAddress();
}

return processConnectionPools[currentProcess];
}

S3BlobStoreEndpoint(std::string const& host,
std::string const& service,
Expand Down Expand Up @@ -241,12 +296,12 @@ class S3BlobStoreEndpoint : public ReferenceCounted<S3BlobStoreEndpoint> {
connectionPool = makeReference<ConnectionPoolData>();
} else {
BlobStoreConnectionPoolKey key(host, service, region, knobs.isTLS());
auto it = globalConnectionPool.find(key);
if (it != globalConnectionPool.end()) {
auto it = getGlobalConnectionPool().find(key);
if (it != getGlobalConnectionPool().end()) {
connectionPool = it->second;
} else {
connectionPool = makeReference<ConnectionPoolData>();
globalConnectionPool.insert({ key, connectionPool });
getGlobalConnectionPool().insert({ key, connectionPool });
}
}
ASSERT(connectionPool.isValid());
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/RESTSimKmsVault.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -836,4 +836,4 @@ TEST_CASE("/restSimKmsVault/GetBlobMetadata/foo") {
VaultResponse response = handleFetchBlobMetada(requestContent);
validateBlobLookup(response, domIds);
return Void();
}
}