Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/library/actors/interconnect/interconnect_handshake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1528,6 +1528,9 @@ namespace NActors {
} else {
sb << "Unable to complete rdma READ work request due to cq runtime error";
}
if (Rdma.Qp) {
sb << " qp: " << Rdma.Qp;
}
LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICRDMA", NLog::PRI_ERROR, sb.c_str());
rdmaReadAck.SetErr(sb);
}
Expand Down
18 changes: 17 additions & 1 deletion ydb/library/actors/interconnect/interconnect_mon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ namespace NInterconnect {
}
}

TString XdcFlagsToString(const ui8 xdcFlags) {
TString res;
if (xdcFlags & TInterconnectProxyTCP::TProxyStats::XDCFlags::MSG_ZERO_COPY_SEND) {
res.append("MSG_ZC_SEND|");
}
if (xdcFlags & TInterconnectProxyTCP::TProxyStats::XDCFlags::RDMA_READ) {
res.append("RDMA_READ|");
}
if (res.empty()) {
res.append("_");
} else {
res.pop_back();
}
return res;
}

TString GenerateHtml() {
TStringStream str;
HTML(str) {
Expand Down Expand Up @@ -131,7 +147,7 @@ namespace NInterconnect {
TABLED() { str << kv.second.TotalOutputQueueSize; }
TABLED() { str << (kv.second.Connected ? "yes" : "<strong>no</strong>"); }
TABLED() { str << (kv.second.ExternalDataChannel ? "yes" : "no")
<< " (" << (kv.second.XDCFlags & TInterconnectProxyTCP::TProxyStats::XDCFlags::MSG_ZERO_COPY_SEND ? "MSG_ZC_SEND" : "_") << ")"; }
<< " (" << XdcFlagsToString(kv.second.XDCFlags) << ")"; }
TABLED() { str << kv.second.Host; }
TABLED() { str << kv.second.Port; }
TABLED() {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/interconnect/interconnect_tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace NActors {
enum XDCFlags {
NONE = 0,
MSG_ZERO_COPY_SEND = 1,
RDMA_READ = 1 << 1,
};
ui8 XDCFlags;
};
Expand Down
21 changes: 15 additions & 6 deletions ydb/library/actors/interconnect/interconnect_tcp_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,25 @@ namespace NActors {
SendUpdateToWhiteboard();
}

std::optional<ui8> TInterconnectSessionTCP::GetXDCFlags() const {
std::optional<ui8> TInterconnectSessionTCP::GetXDCFlags() const noexcept {
std::optional<ui8> flags;
using NInterconnect::NRdma::TQueuePair;
if (XdcSocket) {
flags = TInterconnectProxyTCP::TProxyStats::NONE;
if (ZcProcessor.ZcStateIsOk()) {
return TInterconnectProxyTCP::TProxyStats::MSG_ZERO_COPY_SEND;
} else {
return TInterconnectProxyTCP::TProxyStats::NONE;
*flags |= TInterconnectProxyTCP::TProxyStats::MSG_ZERO_COPY_SEND;
}
if (RdmaQp) {
TQueuePair::TQpState res = RdmaQp->GetState(false);
TQueuePair::TQpS* qpState = std::get_if<TQueuePair::TQpS>(&res);
if (qpState) {
if (TQueuePair::IsRtsState(*qpState)) {
*flags |= TInterconnectProxyTCP::TProxyStats::RDMA_READ;
}
}
}
} else {
return {};
}
return flags;
}

void TInterconnectSessionTCP::CloseInputSession() {
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/interconnect/interconnect_tcp_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ namespace NActors {
return ReceiveContext->ClockSkew_us;
}

std::optional<ui8> GetXDCFlags() const;
std::optional<ui8> GetXDCFlags() const noexcept;

private:
friend class TInterconnectProxyTCP;
Expand Down
17 changes: 15 additions & 2 deletions ydb/library/actors/interconnect/rdma/link_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@

template <>
struct std::less<ibv_gid> {
std::size_t operator()(const ibv_gid& a, const ibv_gid& b) const {
bool operator()(const ibv_gid& a, const ibv_gid& b) const noexcept {
return std::tie(a.global.subnet_prefix, a.global.interface_id) <
std::tie(b.global.subnet_prefix, b.global.interface_id);
}
};
template <>
struct std::equal_to<ibv_gid> {
bool operator()(const ibv_gid& a, const ibv_gid& b) const {
bool operator()(const ibv_gid& a, const ibv_gid& b) const noexcept {
return a.global.interface_id == b.global.interface_id
&& a.global.subnet_prefix == b.global.subnet_prefix;
}
Expand Down Expand Up @@ -118,10 +118,19 @@ static class TRdmaLinkManager {
continue;
}

/*
* TODO: Add ibv_query_gid_table support into arcadia ibv wrapper (contrib/libs/ibdrv)
* This extension allows to get type of GID index to skip unsupported RoCEv1
*/

for (uint8_t portNum = 1; portNum <= devAttrs.phys_port_cnt; portNum++) {
ibv_port_attr portAttrs;
err = ibv_query_port(ctx, portNum, &portAttrs);
if (err == 0) {
if (portAttrs.state != IBV_PORT_ACTIVE) {
continue; //Skip non active ports
}

for (int gidIndex = 0; gidIndex < portAttrs.gid_tbl_len; gidIndex++ ) {
auto ctx = TRdmaCtx::Create(deviceCtx, portNum, gidIndex);
if (!ctx) {
Expand All @@ -135,6 +144,10 @@ static class TRdmaLinkManager {
}
std::sort(CtxMap.begin(), CtxMap.end(),
[](const auto& a, const auto& b) {
if (std::equal_to<ibv_gid>()(a.first, b.first)) {
// Hack: Most implementations have RoCEv2 after RoCEv1, but we prefer RoCEv2 gid index
return a.second->GetGidIndex() > b.second->GetGidIndex();
}
return std::less<ibv_gid>()(a.first, b.first);
});

Expand Down
5 changes: 4 additions & 1 deletion ydb/library/actors/interconnect/rdma/rdma.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ void TQueuePair::Output(IOutputStream& os) const noexcept {
} else {
os << attr.qp_state;
}
if (Ctx) {
os << ", ctx: " << *Ctx;
}
}

TQueuePair::TQpState TQueuePair::GetState(bool forseUpdate) const noexcept {
Expand Down Expand Up @@ -324,7 +327,7 @@ size_t TQueuePair::GetDeviceIndex() const noexcept {
return Ctx->GetDeviceIndex();
}

bool TQueuePair::IsRtsState(TQpS state) {
bool TQueuePair::IsRtsState(TQpS state) noexcept {
enum ibv_qp_state qpState = static_cast<enum ibv_qp_state>(state.State);
return qpState == IBV_QPS_RTS;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/interconnect/rdma/rdma.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class TQueuePair: public NNonCopyable::TMoveOnly {
struct TQpS {
int State;
};
static bool IsRtsState(TQpS state);
static bool IsRtsState(TQpS state) noexcept;
struct TQpErr {
int Err;
};
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/interconnect/ut/interconnect_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ Y_UNIT_TEST_SUITE(Interconnect) {
{
auto s = GetRdmaQpStatus(cluster, 1, 2);
auto tokens = SplitString(s, ",");
UNIT_ASSERT(tokens.size() == 2);
UNIT_ASSERT(tokens.size() > 2);
UNIT_ASSERT(tokens[1] == "QPS_RTS");
}

Expand Down
Loading