Skip to content

Conversation

staryxchen
Copy link
Contributor

Problem

TCP transport was missing handshake daemon initialization during installation, preventing the RPC port from listening for incoming connections. This caused peer notify messages to be undeliverable, breaking inter-node communication.

Target's log
Note that currently only the log is being printed; port listening has not actually begun.

I0916 20:18:13.510746 21264 transfer_engine.cpp:114] Transfer Engine RPC using new RPC mapping, listening on 172.17.0.2:50475

Initiator's log

E0916 20:18:28.652805 21322 transfer_metadata_plugin.cpp:819] SocketHandShakePlugin: connect()172.17.0.2:50475: Connection refused [111]

Solution

  • Add startHandshakeDaemon() method to TcpTransport class, which is similar to RDMATransport but does not register the on_receive_handshake callback function
  • Initialize handshake daemon during transport installation process

- Implement startHandshakeDaemon method in TcpTransport class
- Call handshake daemon during transport installation process

Signed-off-by: staryxchen <[email protected]>
@stmatengss
Copy link
Collaborator

TCP transport probably doesn't require starting the handshake daemon.

@staryxchen
Copy link
Contributor Author

TCP transport probably doesn't require starting the handshake daemon.

Without handshake daemon, notification cannot be sent to the peer when transfer task is finished.

int TransferMetadata::sendNotify(const std::string &peer_server_name,
const NotifyDesc &local_desc,
NotifyDesc &peer_desc) {
RpcMetaDesc peer_location;
if (getRpcMetaEntry(peer_server_name, peer_location)) {
return ERR_METADATA;
}
auto local = TransferNotifyUtil::encode(local_desc);
Json::Value peer;
int ret = handshake_plugin_->sendNotify(
peer_location.ip_or_host_name, peer_location.rpc_port, local, peer);
if (ret) return ret;
TransferNotifyUtil::decode(peer, peer_desc);
if (peer_desc.notify_msg.empty()) {
LOG(ERROR) << "Notify rejected by " << peer_server_name << ": "
<< peer_desc.notify_msg;
return ERR_METADATA;
}
return 0;
}

The daemon needs to listen the rpc port and register callback to process notify message.

@stmatengss
Copy link
Collaborator

TCP transport probably doesn't require starting the handshake daemon.

Without handshake daemon, notification cannot be sent to the peer when transfer task is finished.

int TransferMetadata::sendNotify(const std::string &peer_server_name,
const NotifyDesc &local_desc,
NotifyDesc &peer_desc) {
RpcMetaDesc peer_location;
if (getRpcMetaEntry(peer_server_name, peer_location)) {
return ERR_METADATA;
}
auto local = TransferNotifyUtil::encode(local_desc);
Json::Value peer;
int ret = handshake_plugin_->sendNotify(
peer_location.ip_or_host_name, peer_location.rpc_port, local, peer);
if (ret) return ret;
TransferNotifyUtil::decode(peer, peer_desc);
if (peer_desc.notify_msg.empty()) {
LOG(ERROR) << "Notify rejected by " << peer_server_name << ": "
<< peer_desc.notify_msg;
return ERR_METADATA;
}
return 0;
}

The daemon needs to listen the rpc port and register callback to process notify message.

I see. So we can support p2phandshake based TCP transport with this feature?

@staryxchen
Copy link
Contributor Author

staryxchen commented Sep 17, 2025

TCP transport probably doesn't require starting the handshake daemon.

Without handshake daemon, notification cannot be sent to the peer when transfer task is finished.

int TransferMetadata::sendNotify(const std::string &peer_server_name,
const NotifyDesc &local_desc,
NotifyDesc &peer_desc) {
RpcMetaDesc peer_location;
if (getRpcMetaEntry(peer_server_name, peer_location)) {
return ERR_METADATA;
}
auto local = TransferNotifyUtil::encode(local_desc);
Json::Value peer;
int ret = handshake_plugin_->sendNotify(
peer_location.ip_or_host_name, peer_location.rpc_port, local, peer);
if (ret) return ret;
TransferNotifyUtil::decode(peer, peer_desc);
if (peer_desc.notify_msg.empty()) {
LOG(ERROR) << "Notify rejected by " << peer_server_name << ": "
<< peer_desc.notify_msg;
return ERR_METADATA;
}
return 0;
}

The daemon needs to listen the rpc port and register callback to process notify message.

I see. So we can support p2phandshake based TCP transport with this feature?

No, even if we merge this patch, we still cannot support it. The root cause is the following code segment:

if (metadata_conn_string == P2PHANDSHAKE) {
rpc_binding_method = "P2P handshake";
desc.rpc_port = findAvailableTcpPort(desc.sockfd);
if (desc.rpc_port == 0) {
LOG(ERROR) << "P2P: No valid port found for local TCP service.";
return -1;
}
#if defined(USE_ASCEND)
// The current version of Ascend Transport does not support IPv6,
// but it will be added in a future release.
local_server_name_ =
desc.ip_or_host_name + ":" + std::to_string(desc.rpc_port);
#else
local_server_name_ = maybeWrapIpV6(desc.ip_or_host_name) + ":" +
std::to_string(desc.rpc_port);
#endif

I think there is no need to find another available tcp port when rpc_binding_method is P2PHANDSHAKE mode. Simply bind to the port resolved from the local server name is enough. Like the following:

        if (metadata_conn_string == P2PHANDSHAKE) {
            rpc_binding_method = "P2P handshake";
            // desc.rpc_port = findAvailableTcpPort(desc.sockfd);
            // if (desc.rpc_port == 0) {
            //     LOG(ERROR) << "P2P: No valid port found for local TCP service.";
            //     return -1;
            // }
            desc.sockfd = socket(AF_INET, SOCK_STREAM, 0);
            if (desc.sockfd == -1) {
                LOG(ERROR) << "P2P: Failed to create socket.";
                return -1;
            }
            struct timeval timeout;
            timeout.tv_sec = 1;
            timeout.tv_usec = 0;
            if (setsockopt(desc.sockfd, SOL_SOCKET, SO_RCVTIMEO, &timeout,
                           sizeof(timeout))) {
                LOG(ERROR) << "P2P: Failed to set socket timeout.";
                return -1;
            }
            sockaddr_in bind_address;
            memset(&bind_address, 0, sizeof(bind_address));
            bind_address.sin_family = AF_INET;
            bind_address.sin_port = htons(desc.rpc_port);
            bind_address.sin_addr.s_addr = INADDR_ANY;
            if (bind(desc.sockfd, (sockaddr *)&bind_address, sizeof(bind_address)) < 0) {
                LOG(ERROR) << "P2P: Failed to bind socket.";
                return -1;
            }
#ifdef USE_ASCEND
            // The current version of Ascend Transport does not support IPv6, but it will be added in a future release.
            local_server_name_ = desc.ip_or_host_name + ":" + std::to_string(desc.rpc_port);
#else
            local_server_name_ = maybeWrapIpV6(desc.ip_or_host_name) + ":" + std::to_string(desc.rpc_port);
#endif
        }

What do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants