Skip to content

Commit a7851fd

Browse files
authored
[Libfabric] Efa direct fallback and device mapping improvements (#817)
- Refactor libfabric device discovery and connection management - Change getAvailableEfaDevices() to return provider-device mapping - Update connection state handling in disconnect logic - Modify rail manager to support provider-aware device selection - Enhance topology detection for multi-provider environments - Refactor libfabric memory registration to support device-specific operations - Add device ID parameter to memory registration functions - Update rail manager to handle device-aware memory registration - Improve code formatting and line wrapping consistency - Enhance topology management for multi-device scenarios - Remove numa node mapping with EFA, instead select all rails in case of DRAM - Clean up dead code to improve maintainability. - Remove large blocks of commented-out code in nixlLibfabricEngine::disconnect() and simplify libfabric common utilities. --------- Signed-off-by: Arun Karthik <[email protected]>
1 parent c15e64e commit a7851fd

File tree

10 files changed

+139
-419
lines changed

10 files changed

+139
-419
lines changed

src/plugins/libfabric/libfabric_backend.cpp

Lines changed: 3 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -513,86 +513,15 @@ nixlLibfabricEngine::disconnect(const std::string &remote_agent) {
513513
<< ", fi_addr: " << it->second->rail_remote_addr_list_[0];
514514
return NIXL_SUCCESS;
515515
}
516-
517-
NIXL_DEBUG << "Disconnecting from agent: " << remote_agent;
518-
519-
if (remote_agent != localAgent) {
520-
// Send disconnect control message to remote peer - fire and forget semantics
521-
NIXL_DEBUG << "Sending disconnect notification to remote agent: " << remote_agent;
522-
523-
// Use rail manager's serialization method with "src" prefix (we are sending our source
524-
// endpoints)
525-
std::string serialized_conn_info;
526-
nixl_status_t serialize_status =
527-
rail_manager.serializeConnectionInfo("src", serialized_conn_info);
528-
if (serialize_status != NIXL_SUCCESS) {
529-
NIXL_ERROR << "Rail manager serializeConnectionInfo failed";
530-
return serialize_status;
531-
}
532-
533-
// Allocate control request
534-
const size_t control_rail_id = 0;
535-
const size_t buffer_size =
536-
(rail_manager.getNumDataRails() + rail_manager.getNumControlRails()) *
537-
LF_EP_NAME_MAX_LEN +
538-
1024;
539-
nixlLibfabricReq *control_request =
540-
rail_manager.getControlRail(control_rail_id).allocateControlRequest(buffer_size);
541-
if (!control_request) {
542-
NIXL_ERROR << "Failed to allocate control request for disconnect";
543-
return NIXL_ERR_BACKEND;
544-
}
545-
546-
memcpy(control_request->buffer, serialized_conn_info.data(), serialized_conn_info.length());
547-
548-
// Set the actual size of serialized data
549-
control_request->buffer_size = serialized_conn_info.length();
550-
551-
nixl_status_t status = rail_manager.postControlMessage(
552-
nixlLibfabricRailManager::ControlMessageType::DISCONNECT_REQ,
553-
control_request,
554-
it->second->control_rail_remote_addr_list_[0], // Use control rail 0
555-
it->second->agent_index_);
556-
557-
if (status != NIXL_SUCCESS) {
558-
NIXL_WARN << "Failed to send disconnect notification to " << remote_agent
559-
<< ", proceeding with local cleanup anyway";
560-
// Continue with cleanup even if notification failed
561-
} else {
562-
NIXL_DEBUG << "Disconnect notification sent successfully to " << remote_agent;
563-
}
564-
} else {
565-
NIXL_DEBUG << "Skipping disconnect notification for self-connection";
566-
}
567-
568-
// Clean up libfabric resources (AV entries) via rail manager
569-
NIXL_DEBUG << "Cleaning up libfabric resources for agent: " << remote_agent;
570-
// Clean up libfabric per-connection AV cleanup for both data and control rails
571-
nixl_status_t data_cleanup_status = rail_manager.cleanupConnection(
572-
nixlLibfabricRailManager::RailType::DATA, it->second->rail_remote_addr_list_);
573-
if (data_cleanup_status != NIXL_SUCCESS) {
574-
NIXL_ERROR << "Failed to clean up data rail resources for agent: " << remote_agent
575-
<< " with status: " << data_cleanup_status;
576-
return data_cleanup_status;
577-
}
578-
579-
nixl_status_t control_cleanup_status = rail_manager.cleanupConnection(
580-
nixlLibfabricRailManager::RailType::CONTROL, it->second->control_rail_remote_addr_list_);
581-
if (control_cleanup_status != NIXL_SUCCESS) {
582-
NIXL_ERROR << "Failed to clean up control rail resources for agent: " << remote_agent
583-
<< " with status: " << control_cleanup_status;
584-
return control_cleanup_status;
585-
}
586-
587-
NIXL_DEBUG << "Successfully cleaned up libfabric resources for agent: " << remote_agent;
516+
// TODO: Implement disconnect logic to cleanup the AV Address Entries from both local and remote
517+
// AV.
588518

589519
// Update connection state to DISCONNECTED before removing
590520
it->second->overall_state_ = ConnectionState::DISCONNECTED;
591521

592522
// Remove connection from map
593523
connections_.erase(remote_agent);
594524
NIXL_DEBUG << "Connection erased from the connection map for agent: " << remote_agent;
595-
596525
return NIXL_SUCCESS;
597526
}
598527

@@ -837,6 +766,7 @@ nixlLibfabricEngine::registerMem(const nixlBlobDesc &mem,
837766
nixl_status_t status = rail_manager.registerMemory((void *)mem.addr,
838767
mem.len,
839768
nixl_mem,
769+
mem.devId,
840770
priv->rail_mr_list_,
841771
priv->rail_key_list_,
842772
priv->selected_rails_);

src/utils/libfabric/libfabric_common.cpp

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,39 +22,74 @@
2222
#include <iomanip>
2323
#include <sstream>
2424
#include <atomic>
25+
#include <cstring>
2526

2627
#include <rdma/fabric.h>
2728
#include <rdma/fi_domain.h>
2829

2930
namespace LibfabricUtils {
3031

31-
std::vector<std::string>
32+
33+
std::pair<std::string, std::vector<std::string>>
3234
getAvailableEfaDevices() {
33-
std::vector<std::string> devices;
35+
std::unordered_map<std::string, std::vector<std::string>> provider_devices_map;
36+
std::vector<std::string> all_efa_devices;
37+
std::string fabric_name;
3438
struct fi_info *hints, *info;
3539
hints = fi_allocinfo();
3640
if (!hints) {
3741
NIXL_ERROR << "Failed to allocate fi_info for device discovery";
38-
return devices;
42+
return {fabric_name, all_efa_devices};
3943
}
4044

45+
// Important to initialize this to allow differentiation between EFA and EFA-Direct
46+
hints->mode = ~0;
47+
48+
// Set required capabilities - let libfabric select the best provider
49+
hints->caps = FI_READ | FI_WRITE | FI_RECV | FI_SEND | FI_REMOTE_READ | FI_REMOTE_WRITE |
50+
FI_LOCAL_COMM | FI_REMOTE_COMM;
4151
hints->fabric_attr->prov_name = strdup("efa");
52+
hints->ep_attr->type = FI_EP_RDM;
53+
4254
int ret = fi_getinfo(FI_VERSION(1, 9), NULL, NULL, 0, hints, &info);
4355
if (ret) {
4456
NIXL_ERROR << "fi_getinfo failed during device discovery: " << fi_strerror(-ret);
4557
fi_freeinfo(hints);
46-
return devices;
58+
return {fabric_name, all_efa_devices};
4759
}
4860

61+
// Process providers and filter for EFA providers with RMA capabilities
4962
for (struct fi_info *cur = info; cur; cur = cur->next) {
50-
if (cur->domain_attr && cur->domain_attr->name) {
51-
devices.push_back(cur->domain_attr->name);
63+
if (cur->domain_attr && cur->domain_attr->name && cur->fabric_attr &&
64+
cur->fabric_attr->name) {
65+
66+
std::string device_name = cur->domain_attr->name;
67+
std::string provider_name = cur->fabric_attr->name;
68+
69+
// Add device to the appropriate provider's vector
70+
provider_devices_map[provider_name].push_back(device_name);
71+
72+
NIXL_TRACE << "Found EFA device: " << device_name << " with provider: " << provider_name
73+
<< " (caps: 0x" << std::hex << cur->caps << std::dec << ")";
5274
}
5375
}
5476

5577
fi_freeinfo(info);
5678
fi_freeinfo(hints);
57-
return devices;
79+
80+
// Extract device names from the map, prioritizing efa-direct over efa
81+
all_efa_devices.clear();
82+
if (provider_devices_map.find("efa-direct") != provider_devices_map.end()) {
83+
all_efa_devices = provider_devices_map["efa-direct"];
84+
fabric_name = "efa-direct";
85+
NIXL_TRACE << "Using efa-direct provider with " << all_efa_devices.size() << " devices";
86+
} else if (provider_devices_map.find("efa") != provider_devices_map.end()) {
87+
all_efa_devices = provider_devices_map["efa"];
88+
fabric_name = "efa";
89+
NIXL_TRACE << "Using efa provider with " << all_efa_devices.size() << " devices";
90+
}
91+
92+
return {fabric_name, all_efa_devices};
5893
}
5994

6095
std::string

src/utils/libfabric/libfabric_common.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <vector>
2222
#include <string>
2323
#include <unordered_set>
24+
#include <unordered_map>
2425
#include <cstring>
2526

2627
#include "nixl.h"
@@ -151,7 +152,7 @@ preallocateXferIds(size_t count);
151152
// Utility functions
152153
namespace LibfabricUtils {
153154
// Device discovery
154-
std::vector<std::string>
155+
std::pair<std::string, std::vector<std::string>>
155156
getAvailableEfaDevices();
156157
// String utilities
157158
std::string

src/utils/libfabric/libfabric_rail.cpp

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,9 @@ DataRequestPool::allocate(nixlLibfabricReq::OpType op_type) {
187187

188188
// Rail Class Implementation
189189

190-
nixlLibfabricRail::nixlLibfabricRail(const std::string &device, uint16_t id)
190+
nixlLibfabricRail::nixlLibfabricRail(const std::string &device,
191+
const std::string &provider,
192+
uint16_t id)
191193
: rail_id(id),
192194
device_name(device),
193195
blocking_cq_sread_supported(true),
@@ -203,7 +205,8 @@ nixlLibfabricRail::nixlLibfabricRail(const std::string &device, uint16_t id)
203205
memset(ep_name, 0, sizeof(ep_name));
204206

205207
// Initialize all Libfabric resources for this rail
206-
NIXL_TRACE << "Initializing rail " << rail_id << " with device: " << device_name;
208+
NIXL_TRACE << "Initializing rail " << rail_id << " with device: " << device_name
209+
<< ", provider: " << provider;
207210

208211
// Initialize hints for this rail
209212
struct fi_info *hints = fi_allocinfo();
@@ -212,9 +215,13 @@ nixlLibfabricRail::nixlLibfabricRail(const std::string &device, uint16_t id)
212215
throw std::runtime_error("Failed to allocate fi_info for rail " + std::to_string(rail_id));
213216
}
214217
hints->caps = 0;
215-
hints->caps = FI_MSG | FI_RMA | FI_HMEM;
218+
hints->caps = FI_MSG | FI_RMA;
216219
hints->caps |= FI_LOCAL_COMM | FI_REMOTE_COMM;
217-
hints->mode = FI_CONTEXT | FI_CONTEXT2;
220+
if (provider.c_str() == std::string("efa-direct")) {
221+
hints->mode = FI_CONTEXT | FI_CONTEXT2;
222+
} else {
223+
hints->mode = FI_CONTEXT;
224+
}
218225
hints->ep_attr->type = FI_EP_RDM;
219226
hints->domain_attr->mr_mode =
220227
FI_MR_LOCAL | FI_MR_HMEM | FI_MR_VIRT_ADDR | FI_MR_ALLOCATED | FI_MR_PROV_KEY;
@@ -250,8 +257,8 @@ nixlLibfabricRail::nixlLibfabricRail(const std::string &device, uint16_t id)
250257
cq_attr.size = 12288;
251258
ret = fi_cq_open(domain, &cq_attr, &cq, NULL);
252259
if (ret) {
253-
NIXL_ERROR << "fi_cq_open failed for rail " << rail_id << ": " << fi_strerror(-ret)
254-
<< " - trying FI_WAIT_NONE";
260+
NIXL_WARN << "fi_cq_open failed for rail " << rail_id << ": " << fi_strerror(-ret)
261+
<< " - trying FI_WAIT_NONE";
255262
if (ret == -FI_ENOSYS) {
256263
NIXL_TRACE << "FI_WAIT_UNSPEC not supported, falling back to FI_WAIT_NONE for rail "
257264
<< rail_id;

src/utils/libfabric/libfabric_rail.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ class nixlLibfabricRail {
214214
struct fid_ep *endpoint; ///< Libfabric endpoint handle
215215

216216
/** Initialize libfabric rail with all resources */
217-
nixlLibfabricRail(const std::string &device, uint16_t id);
217+
nixlLibfabricRail(const std::string &device, const std::string &provider, uint16_t id);
218218

219219
/** Destroy rail and cleanup all libfabric resources */
220220
~nixlLibfabricRail();

0 commit comments

Comments
 (0)