From 9bc18bdf9b8fb11bd21ea465266c0b2ff142c233 Mon Sep 17 00:00:00 2001 From: staryxchen Date: Tue, 16 Sep 2025 20:01:10 +0800 Subject: [PATCH] fix(tcp): add handshake daemon initialization - Implement startHandshakeDaemon method in TcpTransport class - Call handshake daemon during transport installation process Signed-off-by: staryxchen --- .../include/transport/tcp_transport/tcp_transport.h | 2 ++ .../src/transport/tcp_transport/tcp_transport.cpp | 12 ++++++++++++ 2 files changed, 14 insertions(+) diff --git a/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h b/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h index adf96c241..f8c95d7e2 100644 --- a/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h +++ b/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h @@ -60,6 +60,8 @@ class TcpTransport : public Transport { std::shared_ptr meta, std::shared_ptr topo); + int startHandshakeDaemon(); + int allocateLocalSegmentID(int tcp_data_port); int registerLocalMemory(void *addr, size_t length, diff --git a/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp b/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp index 71c2176ed..6c636d2ff 100644 --- a/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp +++ b/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp @@ -273,6 +273,12 @@ TcpTransport::~TcpTransport() { metadata_->removeSegmentDesc(local_server_name_); } +int TcpTransport::startHandshakeDaemon() { + return metadata_->startHandshakeDaemon(nullptr, + metadata_->localRpcMeta().rpc_port, + metadata_->localRpcMeta().sockfd); +} + int TcpTransport::install(std::string &local_server_name, std::shared_ptr meta, std::shared_ptr topo) { @@ -292,6 +298,12 @@ int TcpTransport::install(std::string &local_server_name, return -1; } + ret = startHandshakeDaemon(); + if (ret) { + LOG(ERROR) << "TcpTransport: cannot start handshake daemon"; + return -1; + } + ret = metadata_->updateLocalSegmentDesc(); if (ret) { LOG(ERROR) << "TcpTransport: cannot publish segments, "