Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ class RdmaContext {

ibv_cq *cq();

ibv_cq *diagnosticCq() { return diag_cq_; }

volatile int *cqOutstandingCount(int cq_index) {
return &cq_list_[cq_index].outstanding;
}
Expand Down Expand Up @@ -175,6 +177,8 @@ class RdmaContext {
std::shared_ptr<WorkerPool> worker_pool_;

volatile bool active_;

ibv_cq *diag_cq_; // CQ for diagnostic purpose
};

} // namespace mooncake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class RdmaEndPoint {
~RdmaEndPoint();

int construct(ibv_cq *cq, size_t num_qp_list = 2, size_t max_sge = 4,
size_t max_wr = 256, size_t max_inline = 64);
size_t max_wr = 256, size_t max_inline = 64,
ibv_cq *diag_cq = nullptr);

private:
int deconstruct();
Expand All @@ -73,9 +74,9 @@ class RdmaEndPoint {

bool active() const { return active_; }

void set_active(bool flag) {
void set_active(bool flag) {
RWSpinlock::WriteGuard guard(lock_);
active_ = flag;
active_ = flag;
if (!flag) inactive_time_ = getCurrentTimeInNano();
}

Expand Down Expand Up @@ -109,6 +110,8 @@ class RdmaEndPoint {
// Failed tasks (which must be submitted) are inserted in failed_slice_list
int submitPostSend(std::vector<Transport::Slice *> &slice_list,
std::vector<Transport::Slice *> &failed_slice_list);

int submitZeroByteMessage();

private:
std::vector<uint32_t> qpNum() const;
Expand Down Expand Up @@ -136,6 +139,7 @@ class RdmaEndPoint {
volatile bool active_;
volatile int *cq_outstanding_;
volatile uint64_t inactive_time_;
bool has_diag_cq_;
};

} // namespace mooncake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class WorkerPool {

int doProcessContextEvents();

void connectivityTest(int segment_id,
RdmaTransport::SegmentDesc &remote_desc);

private:
RdmaContext &context_;
const int numa_socket_id_;
Expand All @@ -69,6 +72,9 @@ class WorkerPool {
std::atomic<uint64_t> submitted_slice_count_, processed_slice_count_;

uint64_t success_nr_polls = 0, failed_nr_polls = 0;
std::unordered_set<std::string> probe_unconnected_list;
std::atomic<bool> probe_completed = false;
RWSpinlock probe_lock;
};
} // namespace mooncake

Expand Down
5 changes: 0 additions & 5 deletions mooncake-transfer-engine/src/transfer_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ int TransferEngine::init(const std::string &metadata_conn_string,
const std::string &local_server_name,
const std::string &ip_or_host_name,
uint64_t rpc_port) {
LOG(INFO) << "Transfer Engine starting. Server: " << local_server_name
<< ", Metadata: " << metadata_conn_string
<< ", ip_or_host_name: " << ip_or_host_name
<< ", rpc_port: " << rpc_port;

local_server_name_ = local_server_name;
TransferMetadata::RpcMetaDesc desc;
std::string rpc_binding_method;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ std::shared_ptr<RdmaEndPoint> FIFOEndpointStore::insertEndpoint(
return nullptr;
}
auto &config = globalConfig();
int ret =
endpoint->construct(context->cq(), config.num_qp_per_ep, config.max_sge,
config.max_wr, config.max_inline);
int ret = endpoint->construct(context->cq(), config.num_qp_per_ep,
config.max_sge, config.max_wr,
config.max_inline, context->diagnosticCq());
if (ret) return nullptr;

while (this->getSize() >= max_size_) evictEndpoint();
Expand Down Expand Up @@ -143,9 +143,9 @@ std::shared_ptr<RdmaEndPoint> SIEVEEndpointStore::insertEndpoint(
return nullptr;
}
auto &config = globalConfig();
int ret =
endpoint->construct(context->cq(), config.num_qp_per_ep, config.max_sge,
config.max_wr, config.max_inline);
int ret = endpoint->construct(context->cq(), config.num_qp_per_ep,
config.max_sge, config.max_wr,
config.max_inline, context->diagnosticCq());
if (ret) return nullptr;

while (this->getSize() >= max_size_) evictEndpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ int RdmaContext::construct(size_t num_cq_list, size_t num_comp_channels,
cq_list_[i].native = cq;
}

diag_cq_ = ibv_create_cq(context_, max_cqe, nullptr, nullptr, 0);
if (!diag_cq_) {
PLOG(ERROR) << "Failed to create diagnostic completion queue";
close(event_fd_);
return ERR_CONTEXT;
}

worker_pool_ = std::make_shared<WorkerPool>(*this, socketId());

LOG(INFO) << "RDMA device: " << context_->device->name << ", LID: " << lid_
Expand Down Expand Up @@ -165,6 +172,11 @@ int RdmaContext::deconstruct() {
}
cq_list_.clear();

int ret = ibv_destroy_cq(diag_cq_);
if (ret) {
PLOG(ERROR) << "Failed to destroy completion queue";
}

if (event_fd_ >= 0) {
if (close(event_fd_)) LOG(ERROR) << "Failed to close epoll fd";
event_fd_ = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ RdmaEndPoint::~RdmaEndPoint() {

int RdmaEndPoint::construct(ibv_cq *cq, size_t num_qp_list,
size_t max_sge_per_wr, size_t max_wr_depth,
size_t max_inline_bytes) {
size_t max_inline_bytes, ibv_cq *diag_cq) {
if (status_.load(std::memory_order_relaxed) != INITIALIZING) {
LOG(ERROR) << "Endpoint has already been constructed";
return ERR_ENDPOINT;
}

has_diag_cq_ = (diag_cq != nullptr);
if (has_diag_cq_) num_qp_list++;
qp_list_.resize(num_qp_list);
cq_outstanding_ = (volatile int *)cq->cq_context;

Expand All @@ -57,6 +59,7 @@ int RdmaEndPoint::construct(ibv_cq *cq, size_t num_qp_list,
wr_depth_list_[i] = 0;
ibv_qp_init_attr attr;
memset(&attr, 0, sizeof(attr));
if (i + 1 == num_qp_list && has_diag_cq_) cq = diag_cq;
attr.send_cq = cq;
attr.recv_cq = cq;
attr.sq_sig_all = false;
Expand Down Expand Up @@ -86,8 +89,8 @@ int RdmaEndPoint::deconstruct() {
bool displayed = false;
if (wr_depth_list_[i] != 0) {
if (!displayed) {
LOG(WARNING)
<< "Outstanding work requests found, CQ will not be generated";
LOG(WARNING) << "Outstanding work requests found, CQ will not "
"be generated";
displayed = true;
}
__sync_fetch_and_sub(cq_outstanding_, wr_depth_list_[i]);
Expand Down Expand Up @@ -236,8 +239,8 @@ void RdmaEndPoint::disconnectUnlocked() {
bool displayed = false;
if (wr_depth_list_[i] != 0) {
if (!displayed) {
LOG(WARNING)
<< "Outstanding work requests found, CQ will not be generated";
LOG(WARNING) << "Outstanding work requests found, CQ will not "
"be generated";
displayed = true;
}
__sync_fetch_and_sub(cq_outstanding_, wr_depth_list_[i]);
Expand Down Expand Up @@ -268,7 +271,18 @@ int RdmaEndPoint::submitPostSend(
std::vector<Transport::Slice *> &failed_slice_list) {
RWSpinlock::WriteGuard guard(lock_);
if (!active_) return 0;
int qp_index = SimpleRandom::Get().next(qp_list_.size());
bool inject_failure = false;
if (getenv("MC_INJECT_FAILURE")) {
if (context_.deviceName() == getenv("MC_INJECT_FAILURE")) {
inject_failure = (SimpleRandom::Get().next(100) % 100 == 0);
}
if (inject_failure) {
LOG(WARNING) << "Injecting a failure on device "
<< context_.nicPath();
}
}
size_t qp_list_size = has_diag_cq_ ? qp_list_.size() - 1 : qp_list_.size();
int qp_index = SimpleRandom::Get().next(qp_list_size);
int wr_count = std::min(max_wr_depth_ - wr_depth_list_[qp_index],
(int)slice_list.size());
wr_count =
Expand All @@ -295,7 +309,7 @@ int RdmaEndPoint::submitPostSend(
wr.send_flags = IBV_SEND_SIGNALED;
wr.next = (i + 1 == wr_count) ? nullptr : &wr_list[i + 1];
wr.imm_data = 0;
wr.wr.rdma.remote_addr = slice->rdma.dest_addr;
wr.wr.rdma.remote_addr = inject_failure ? 0 : slice->rdma.dest_addr;
wr.wr.rdma.rkey = slice->rdma.dest_rkey;
slice->ts = getCurrentTimeInNano();
slice->status = Transport::Slice::POSTED;
Expand All @@ -318,6 +332,30 @@ int RdmaEndPoint::submitPostSend(
return 0;
}

int RdmaEndPoint::submitZeroByteMessage() {
RWSpinlock::WriteGuard guard(lock_);
if (!active_ || !has_diag_cq_) return 0;
ibv_send_wr wr, *bad_wr = nullptr;
ibv_sge sge;
sge.addr = 0;
sge.length = 0;
sge.lkey = 0;
memset(&wr, 0, sizeof(ibv_send_wr));
wr.wr_id = (uint64_t)this;
wr.opcode = IBV_WR_RDMA_READ;
wr.num_sge = 0;
wr.sg_list = &sge;
wr.send_flags = IBV_SEND_SIGNALED;
wr.next = nullptr;
wr.imm_data = 0;
int rc = ibv_post_send(qp_list_[qp_list_.size() - 1], &wr, &bad_wr);
if (rc) {
PLOG(ERROR) << "Failed to ibv_post_send";
return -1;
}
return 0;
}

std::vector<uint32_t> RdmaEndPoint::qpNum() const {
std::vector<uint32_t> ret;
for (int qp_index = 0; qp_index < (int)qp_list_.size(); ++qp_index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ int WorkerPool::submitPostSend(
segment_desc_map;
for (auto &slice : slice_list) {
auto target_id = slice->target_id;
if (!segment_desc_map.count(target_id))
segment_desc_map[target_id] =
context_.engine().meta()->getSegmentDescByID(target_id);
if (!segment_desc_map.count(target_id)) {
auto desc = context_.engine().meta()->getSegmentDescByID(target_id);
segment_desc_map[target_id] = desc;
connectivityTest(target_id, *desc);
}
}
#endif // CONFIG_CACHE_SEGMENT_DESC

Expand Down Expand Up @@ -142,6 +144,13 @@ int WorkerPool::submitPostSend(
auto peer_nic_path =
MakeNicPath(peer_segment_desc->name,
peer_segment_desc->devices[device_id].name);
{
RWSpinlock::ReadGuard guard(probe_lock);
if (probe_unconnected_list.count(peer_nic_path)) {
slice->markFailed();
continue;
}
}
slice->peer_nic_path = peer_nic_path;
int shard_id = (slice->target_id * 10007 + device_id) % kShardCount;
slice_list_map[shard_id].push_back(slice);
Expand Down Expand Up @@ -229,7 +238,8 @@ void WorkerPool::performPostSend(int thread_id) {
}
if (!endpoint->active()) {
if (endpoint->inactiveTime() > 1.0)
context_.deleteEndpoint(entry.first); // enable for re-establishation
context_.deleteEndpoint(
entry.first); // enable for re-establishation
for (auto &slice : entry.second) failed_slice_list.push_back(slice);
entry.second.clear();
continue;
Expand Down Expand Up @@ -452,4 +462,71 @@ void WorkerPool::monitorWorker() {
doProcessContextEvents();
}
}

void WorkerPool::connectivityTest(int segment_id,
RdmaTransport::SegmentDesc &remote_desc) {
if (probe_completed) return;
RWSpinlock::WriteGuard guard(probe_lock);
if (probe_completed) return;
LOG(INFO) << "Context " << context_.deviceName()
<< ", start connectivity test";
int num_packets = 0;
for (auto &device : remote_desc.devices) {
// eRDMA doesn't support zero byte message
if (device.name.starts_with("erdma")) continue;
auto peer_nic_path = MakeNicPath(remote_desc.name, device.name);
auto endpoint = context_.endpoint(peer_nic_path);
if (!endpoint) {
LOG(INFO) << "Context " << context_.deviceName()
<< ", cannot allocate endpoint for " << peer_nic_path;
probe_unconnected_list.insert(peer_nic_path);
continue;
}
if (endpoint->setupConnectionsByActive()) {
LOG(INFO) << "Context " << context_.deviceName()
<< ", cannot setup connection for " << peer_nic_path;
probe_unconnected_list.insert(peer_nic_path);
continue;
}
if (endpoint->submitZeroByteMessage()) {
LOG(INFO) << "Context " << context_.deviceName()
<< ", cannot submit zero byte message for "
<< peer_nic_path;
probe_unconnected_list.insert(peer_nic_path);
continue;
}
num_packets++;
}
while (num_packets) {
ibv_wc wc_list[16];
int nr_poll = ibv_poll_cq(context_.diagnosticCq(), 16, wc_list);
if (nr_poll < 0) {
LOG(INFO) << "Context " << context_.deviceName()
<< ", found ibv_poll_cq error for all devices";
for (auto &device : remote_desc.devices) {
auto peer_nic_path = MakeNicPath(remote_desc.name, device.name);
probe_unconnected_list.insert(peer_nic_path);
}
return;
}
for (int i = 0; i < nr_poll; ++i) {
if (wc_list[i].status != IBV_WC_SUCCESS) {
auto context = (RdmaContext *)wc_list[i].wr_id;
auto peer_nic_path =
MakeNicPath(remote_desc.name, context->deviceName());
probe_unconnected_list.insert(peer_nic_path);
LOG(INFO) << "Context " << context_.deviceName()
<< ", found work completion error for "
<< peer_nic_path;
}
}
num_packets -= nr_poll;
}
LOG(INFO) << "Connectivity test detected phase completed";
for (auto &entry : probe_unconnected_list) {
context_.deleteEndpoint(entry);
}
LOG(INFO) << "Connectivity test completed";
probe_completed = true;
}
} // namespace mooncake
Loading