diff --git a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h index d068592c9..bbc2fd773 100644 --- a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h +++ b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h @@ -117,6 +117,8 @@ class RdmaContext { ibv_cq *cq(); + ibv_cq *diagnosticCq() { return diag_cq_; } + volatile int *cqOutstandingCount(int cq_index) { return &cq_list_[cq_index].outstanding; } @@ -175,6 +177,8 @@ class RdmaContext { std::shared_ptr worker_pool_; volatile bool active_; + + ibv_cq *diag_cq_; // CQ for diagnostic purpose }; } // namespace mooncake diff --git a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_endpoint.h b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_endpoint.h index 9c583d152..d7fb8fc7a 100644 --- a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_endpoint.h +++ b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_endpoint.h @@ -50,7 +50,8 @@ class RdmaEndPoint { ~RdmaEndPoint(); int construct(ibv_cq *cq, size_t num_qp_list = 2, size_t max_sge = 4, - size_t max_wr = 256, size_t max_inline = 64); + size_t max_wr = 256, size_t max_inline = 64, + ibv_cq *diag_cq = nullptr); private: int deconstruct(); @@ -73,9 +74,9 @@ class RdmaEndPoint { bool active() const { return active_; } - void set_active(bool flag) { + void set_active(bool flag) { RWSpinlock::WriteGuard guard(lock_); - active_ = flag; + active_ = flag; if (!flag) inactive_time_ = getCurrentTimeInNano(); } @@ -109,6 +110,8 @@ class RdmaEndPoint { // Failed tasks (which must be submitted) are inserted in failed_slice_list int submitPostSend(std::vector &slice_list, std::vector &failed_slice_list); + + int submitZeroByteMessage(); private: std::vector qpNum() const; @@ -136,6 +139,7 @@ class RdmaEndPoint { volatile bool active_; volatile int *cq_outstanding_; volatile uint64_t inactive_time_; + bool has_diag_cq_; }; } // namespace mooncake diff --git a/mooncake-transfer-engine/include/transport/rdma_transport/worker_pool.h b/mooncake-transfer-engine/include/transport/rdma_transport/worker_pool.h index f6b24248e..fe27e61ca 100644 --- a/mooncake-transfer-engine/include/transport/rdma_transport/worker_pool.h +++ b/mooncake-transfer-engine/include/transport/rdma_transport/worker_pool.h @@ -43,6 +43,9 @@ class WorkerPool { int doProcessContextEvents(); + void connectivityTest(int segment_id, + RdmaTransport::SegmentDesc &remote_desc); + private: RdmaContext &context_; const int numa_socket_id_; @@ -69,6 +72,9 @@ class WorkerPool { std::atomic submitted_slice_count_, processed_slice_count_; uint64_t success_nr_polls = 0, failed_nr_polls = 0; + std::unordered_set probe_unconnected_list; + std::atomic probe_completed = false; + RWSpinlock probe_lock; }; } // namespace mooncake diff --git a/mooncake-transfer-engine/src/transfer_engine.cpp b/mooncake-transfer-engine/src/transfer_engine.cpp index 3f62b1ae5..335343059 100644 --- a/mooncake-transfer-engine/src/transfer_engine.cpp +++ b/mooncake-transfer-engine/src/transfer_engine.cpp @@ -41,11 +41,6 @@ int TransferEngine::init(const std::string &metadata_conn_string, const std::string &local_server_name, const std::string &ip_or_host_name, uint64_t rpc_port) { - LOG(INFO) << "Transfer Engine starting. Server: " << local_server_name - << ", Metadata: " << metadata_conn_string - << ", ip_or_host_name: " << ip_or_host_name - << ", rpc_port: " << rpc_port; - local_server_name_ = local_server_name; TransferMetadata::RpcMetaDesc desc; std::string rpc_binding_method; diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/endpoint_store.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/endpoint_store.cpp index 3934361de..705e9d3e7 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/endpoint_store.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/endpoint_store.cpp @@ -49,9 +49,9 @@ std::shared_ptr FIFOEndpointStore::insertEndpoint( return nullptr; } auto &config = globalConfig(); - int ret = - endpoint->construct(context->cq(), config.num_qp_per_ep, config.max_sge, - config.max_wr, config.max_inline); + int ret = endpoint->construct(context->cq(), config.num_qp_per_ep, + config.max_sge, config.max_wr, + config.max_inline, context->diagnosticCq()); if (ret) return nullptr; while (this->getSize() >= max_size_) evictEndpoint(); @@ -143,9 +143,9 @@ std::shared_ptr SIEVEEndpointStore::insertEndpoint( return nullptr; } auto &config = globalConfig(); - int ret = - endpoint->construct(context->cq(), config.num_qp_per_ep, config.max_sge, - config.max_wr, config.max_inline); + int ret = endpoint->construct(context->cq(), config.num_qp_per_ep, + config.max_sge, config.max_wr, + config.max_inline, context->diagnosticCq()); if (ret) return nullptr; while (this->getSize() >= max_size_) evictEndpoint(); diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_context.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_context.cpp index fbf556037..b4758e817 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_context.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_context.cpp @@ -120,6 +120,13 @@ int RdmaContext::construct(size_t num_cq_list, size_t num_comp_channels, cq_list_[i].native = cq; } + diag_cq_ = ibv_create_cq(context_, max_cqe, nullptr, nullptr, 0); + if (!diag_cq_) { + PLOG(ERROR) << "Failed to create diagnostic completion queue"; + close(event_fd_); + return ERR_CONTEXT; + } + worker_pool_ = std::make_shared(*this, socketId()); LOG(INFO) << "RDMA device: " << context_->device->name << ", LID: " << lid_ @@ -165,6 +172,11 @@ int RdmaContext::deconstruct() { } cq_list_.clear(); + int ret = ibv_destroy_cq(diag_cq_); + if (ret) { + PLOG(ERROR) << "Failed to destroy completion queue"; + } + if (event_fd_ >= 0) { if (close(event_fd_)) LOG(ERROR) << "Failed to close epoll fd"; event_fd_ = -1; diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp index 6c5429a4a..23fa3477e 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp @@ -38,12 +38,14 @@ RdmaEndPoint::~RdmaEndPoint() { int RdmaEndPoint::construct(ibv_cq *cq, size_t num_qp_list, size_t max_sge_per_wr, size_t max_wr_depth, - size_t max_inline_bytes) { + size_t max_inline_bytes, ibv_cq *diag_cq) { if (status_.load(std::memory_order_relaxed) != INITIALIZING) { LOG(ERROR) << "Endpoint has already been constructed"; return ERR_ENDPOINT; } + has_diag_cq_ = (diag_cq != nullptr); + if (has_diag_cq_) num_qp_list++; qp_list_.resize(num_qp_list); cq_outstanding_ = (volatile int *)cq->cq_context; @@ -57,6 +59,7 @@ int RdmaEndPoint::construct(ibv_cq *cq, size_t num_qp_list, wr_depth_list_[i] = 0; ibv_qp_init_attr attr; memset(&attr, 0, sizeof(attr)); + if (i + 1 == num_qp_list && has_diag_cq_) cq = diag_cq; attr.send_cq = cq; attr.recv_cq = cq; attr.sq_sig_all = false; @@ -86,8 +89,8 @@ int RdmaEndPoint::deconstruct() { bool displayed = false; if (wr_depth_list_[i] != 0) { if (!displayed) { - LOG(WARNING) - << "Outstanding work requests found, CQ will not be generated"; + LOG(WARNING) << "Outstanding work requests found, CQ will not " + "be generated"; displayed = true; } __sync_fetch_and_sub(cq_outstanding_, wr_depth_list_[i]); @@ -236,8 +239,8 @@ void RdmaEndPoint::disconnectUnlocked() { bool displayed = false; if (wr_depth_list_[i] != 0) { if (!displayed) { - LOG(WARNING) - << "Outstanding work requests found, CQ will not be generated"; + LOG(WARNING) << "Outstanding work requests found, CQ will not " + "be generated"; displayed = true; } __sync_fetch_and_sub(cq_outstanding_, wr_depth_list_[i]); @@ -268,7 +271,18 @@ int RdmaEndPoint::submitPostSend( std::vector &failed_slice_list) { RWSpinlock::WriteGuard guard(lock_); if (!active_) return 0; - int qp_index = SimpleRandom::Get().next(qp_list_.size()); + bool inject_failure = false; + if (getenv("MC_INJECT_FAILURE")) { + if (context_.deviceName() == getenv("MC_INJECT_FAILURE")) { + inject_failure = (SimpleRandom::Get().next(100) % 100 == 0); + } + if (inject_failure) { + LOG(WARNING) << "Injecting a failure on device " + << context_.nicPath(); + } + } + size_t qp_list_size = has_diag_cq_ ? qp_list_.size() - 1 : qp_list_.size(); + int qp_index = SimpleRandom::Get().next(qp_list_size); int wr_count = std::min(max_wr_depth_ - wr_depth_list_[qp_index], (int)slice_list.size()); wr_count = @@ -295,7 +309,7 @@ int RdmaEndPoint::submitPostSend( wr.send_flags = IBV_SEND_SIGNALED; wr.next = (i + 1 == wr_count) ? nullptr : &wr_list[i + 1]; wr.imm_data = 0; - wr.wr.rdma.remote_addr = slice->rdma.dest_addr; + wr.wr.rdma.remote_addr = inject_failure ? 0 : slice->rdma.dest_addr; wr.wr.rdma.rkey = slice->rdma.dest_rkey; slice->ts = getCurrentTimeInNano(); slice->status = Transport::Slice::POSTED; @@ -318,6 +332,30 @@ int RdmaEndPoint::submitPostSend( return 0; } +int RdmaEndPoint::submitZeroByteMessage() { + RWSpinlock::WriteGuard guard(lock_); + if (!active_ || !has_diag_cq_) return 0; + ibv_send_wr wr, *bad_wr = nullptr; + ibv_sge sge; + sge.addr = 0; + sge.length = 0; + sge.lkey = 0; + memset(&wr, 0, sizeof(ibv_send_wr)); + wr.wr_id = (uint64_t)this; + wr.opcode = IBV_WR_RDMA_READ; + wr.num_sge = 0; + wr.sg_list = &sge; + wr.send_flags = IBV_SEND_SIGNALED; + wr.next = nullptr; + wr.imm_data = 0; + int rc = ibv_post_send(qp_list_[qp_list_.size() - 1], &wr, &bad_wr); + if (rc) { + PLOG(ERROR) << "Failed to ibv_post_send"; + return -1; + } + return 0; +} + std::vector RdmaEndPoint::qpNum() const { std::vector ret; for (int qp_index = 0; qp_index < (int)qp_list_.size(); ++qp_index) diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/worker_pool.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/worker_pool.cpp index cce544009..b05fccc15 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/worker_pool.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/worker_pool.cpp @@ -89,9 +89,11 @@ int WorkerPool::submitPostSend( segment_desc_map; for (auto &slice : slice_list) { auto target_id = slice->target_id; - if (!segment_desc_map.count(target_id)) - segment_desc_map[target_id] = - context_.engine().meta()->getSegmentDescByID(target_id); + if (!segment_desc_map.count(target_id)) { + auto desc = context_.engine().meta()->getSegmentDescByID(target_id); + segment_desc_map[target_id] = desc; + connectivityTest(target_id, *desc); + } } #endif // CONFIG_CACHE_SEGMENT_DESC @@ -142,6 +144,13 @@ int WorkerPool::submitPostSend( auto peer_nic_path = MakeNicPath(peer_segment_desc->name, peer_segment_desc->devices[device_id].name); + { + RWSpinlock::ReadGuard guard(probe_lock); + if (probe_unconnected_list.count(peer_nic_path)) { + slice->markFailed(); + continue; + } + } slice->peer_nic_path = peer_nic_path; int shard_id = (slice->target_id * 10007 + device_id) % kShardCount; slice_list_map[shard_id].push_back(slice); @@ -229,7 +238,8 @@ void WorkerPool::performPostSend(int thread_id) { } if (!endpoint->active()) { if (endpoint->inactiveTime() > 1.0) - context_.deleteEndpoint(entry.first); // enable for re-establishation + context_.deleteEndpoint( + entry.first); // enable for re-establishation for (auto &slice : entry.second) failed_slice_list.push_back(slice); entry.second.clear(); continue; @@ -452,4 +462,71 @@ void WorkerPool::monitorWorker() { doProcessContextEvents(); } } + +void WorkerPool::connectivityTest(int segment_id, + RdmaTransport::SegmentDesc &remote_desc) { + if (probe_completed) return; + RWSpinlock::WriteGuard guard(probe_lock); + if (probe_completed) return; + LOG(INFO) << "Context " << context_.deviceName() + << ", start connectivity test"; + int num_packets = 0; + for (auto &device : remote_desc.devices) { + // eRDMA doesn't support zero byte message + if (device.name.starts_with("erdma")) continue; + auto peer_nic_path = MakeNicPath(remote_desc.name, device.name); + auto endpoint = context_.endpoint(peer_nic_path); + if (!endpoint) { + LOG(INFO) << "Context " << context_.deviceName() + << ", cannot allocate endpoint for " << peer_nic_path; + probe_unconnected_list.insert(peer_nic_path); + continue; + } + if (endpoint->setupConnectionsByActive()) { + LOG(INFO) << "Context " << context_.deviceName() + << ", cannot setup connection for " << peer_nic_path; + probe_unconnected_list.insert(peer_nic_path); + continue; + } + if (endpoint->submitZeroByteMessage()) { + LOG(INFO) << "Context " << context_.deviceName() + << ", cannot submit zero byte message for " + << peer_nic_path; + probe_unconnected_list.insert(peer_nic_path); + continue; + } + num_packets++; + } + while (num_packets) { + ibv_wc wc_list[16]; + int nr_poll = ibv_poll_cq(context_.diagnosticCq(), 16, wc_list); + if (nr_poll < 0) { + LOG(INFO) << "Context " << context_.deviceName() + << ", found ibv_poll_cq error for all devices"; + for (auto &device : remote_desc.devices) { + auto peer_nic_path = MakeNicPath(remote_desc.name, device.name); + probe_unconnected_list.insert(peer_nic_path); + } + return; + } + for (int i = 0; i < nr_poll; ++i) { + if (wc_list[i].status != IBV_WC_SUCCESS) { + auto context = (RdmaContext *)wc_list[i].wr_id; + auto peer_nic_path = + MakeNicPath(remote_desc.name, context->deviceName()); + probe_unconnected_list.insert(peer_nic_path); + LOG(INFO) << "Context " << context_.deviceName() + << ", found work completion error for " + << peer_nic_path; + } + } + num_packets -= nr_poll; + } + LOG(INFO) << "Connectivity test detected phase completed"; + for (auto &entry : probe_unconnected_list) { + context_.deleteEndpoint(entry); + } + LOG(INFO) << "Connectivity test completed"; + probe_completed = true; +} } // namespace mooncake