Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mooncake-store/include/master_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class MasterClient {
* containing view version and client status
*/
[[nodiscard]] tl::expected<PingResponse, ErrorCode> Ping(
const UUID& client_id);
const UUID& client_id, size_t qp_count);

private:
/**
Expand Down
4 changes: 4 additions & 0 deletions mooncake-store/include/master_metric_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ class MasterMetricManager {
// Cluster Metrics
void inc_active_clients(int64_t val = 1);
void dec_active_clients(int64_t val = 1);
void inc_cluster_total_qp_num(int64_t val = 1);
void dec_cluster_total_qp_num(int64_t val = 1);
int64_t get_active_clients();
int64_t get_cluster_total_qp_num();

// Operation Statistics (Counters)
void inc_put_start_requests(int64_t val = 1);
Expand Down Expand Up @@ -178,6 +181,7 @@ class MasterMetricManager {

// Cluster Metrics
ylt::metric::gauge_t active_clients_;
ylt::metric::gauge_t cluster_total_qp_num_;

// Operation Statistics
ylt::metric::counter_t put_start_requests_;
Expand Down
5 changes: 4 additions & 1 deletion mooncake-store/include/master_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ class MasterService {
* @return ErrorCode::OK on success, ErrorCode::INTERNAL_ERROR if the client
* ping queue is full
*/
auto Ping(const UUID& client_id) -> tl::expected<PingResponse, ErrorCode>;
auto Ping(const UUID& client_id, size_t qp_count)
-> tl::expected<PingResponse, ErrorCode>;

/**
* @brief Get the master service cluster ID to use as subdirectory name
Expand Down Expand Up @@ -435,6 +436,8 @@ class MasterService {
mutable std::shared_mutex client_mutex_;
std::unordered_set<UUID, boost::hash<UUID>>
ok_client_; // client with ok status
std::unordered_map<UUID, uint64_t, boost::hash<UUID>>
client_qp_counts_; // QP count per client
void ClientMonitorFunc();
std::thread client_monitor_thread_;
std::atomic<bool> client_monitor_running_{false};
Expand Down
3 changes: 2 additions & 1 deletion mooncake-store/include/rpc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class WrappedMasterService {

tl::expected<std::string, ErrorCode> GetFsdir();

tl::expected<PingResponse, ErrorCode> Ping(const UUID& client_id);
tl::expected<PingResponse, ErrorCode> Ping(const UUID& client_id,
size_t qp_count);

private:
MasterService master_service_;
Expand Down
13 changes: 9 additions & 4 deletions mooncake-store/include/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,19 +489,24 @@ inline std::ostream& operator<<(std::ostream& os,
struct PingResponse {
ViewVersionId view_version_id;
ClientStatus client_status;
uint64_t total_qp_num{0};

PingResponse() = default;
PingResponse(ViewVersionId view_version, ClientStatus status)
: view_version_id(view_version), client_status(status) {}
PingResponse(ViewVersionId view_version, ClientStatus status,
uint64_t qp_number = 0)
: view_version_id(view_version),
client_status(status),
total_qp_num(qp_number) {}

friend std::ostream& operator<<(std::ostream& os,
const PingResponse& response) noexcept {
return os << "PingResponse: { view_version_id: "
<< response.view_version_id
<< ", client_status: " << response.client_status << " }";
<< ", client_status: " << response.client_status
<< ", total_qp_num: " << response.total_qp_num << " }";
}
};
YLT_REFL(PingResponse, view_version_id, client_status);
YLT_REFL(PingResponse, view_version_id, client_status, total_qp_num);

enum class BufferAllocatorType {
CACHELIB = 0, // CachelibBufferAllocator
Expand Down
4 changes: 3 additions & 1 deletion mooncake-store/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,9 @@ void Client::PingThreadFunc() {
}

// Ping master
auto ping_result = master_client_.Ping(client_id_);
size_t qp_num_per_client = transfer_engine_.getTotalQpNum();
auto ping_result =
master_client_.Ping(client_id_, (const size_t&)qp_num_per_client);
if (ping_result) {
// Reset ping failure count
ping_fail_count = 0;
Expand Down
8 changes: 4 additions & 4 deletions mooncake-store/src/master_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,13 @@ tl::expected<void, ErrorCode> MasterClient::UnmountSegment(
return result;
}

tl::expected<PingResponse, ErrorCode> MasterClient::Ping(
const UUID& client_id) {
tl::expected<PingResponse, ErrorCode> MasterClient::Ping(const UUID& client_id,
size_t qp_count) {
ScopedVLogTimer timer(1, "MasterClient::Ping");
timer.LogRequest("client_id=", client_id);

auto result =
invoke_rpc<&WrappedMasterService::Ping, PingResponse>(client_id);
auto result = invoke_rpc<&WrappedMasterService::Ping, PingResponse>(
client_id, qp_count);
timer.LogResponseExpected(result);
return result;
}
Expand Down
17 changes: 17 additions & 0 deletions mooncake-store/src/master_metric_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ MasterMetricManager::MasterMetricManager()
// Initialize cluster metrics
active_clients_("master_active_clients",
"Total number of active clients"),
cluster_total_qp_num_("master_cluster_total_qp_num",
"Total number of QP across all clients"),

// Initialize Request Counters
put_start_requests_("master_put_start_requests_total",
Expand Down Expand Up @@ -241,6 +243,18 @@ int64_t MasterMetricManager::get_active_clients() {
return active_clients_.value();
}

void MasterMetricManager::inc_cluster_total_qp_num(int64_t val) {
cluster_total_qp_num_.inc(val);
}

void MasterMetricManager::dec_cluster_total_qp_num(int64_t val) {
cluster_total_qp_num_.dec(val);
}

int64_t MasterMetricManager::get_cluster_total_qp_num() {
return cluster_total_qp_num_.value();
}

// Operation Statistics (Counters)
void MasterMetricManager::inc_exist_key_requests(int64_t val) {
exist_key_requests_.inc(val);
Expand Down Expand Up @@ -616,6 +630,7 @@ std::string MasterMetricManager::serialize_metrics() {
serialize_metric(total_capacity_);
serialize_metric(key_count_);
serialize_metric(soft_pin_key_count_);
serialize_metric(cluster_total_qp_num_);
if (enable_ha_) {
serialize_metric(active_clients_);
}
Expand Down Expand Up @@ -680,6 +695,7 @@ std::string MasterMetricManager::get_summary_string() {
int64_t keys = key_count_.value();
int64_t soft_pin_keys = soft_pin_key_count_.value();
int64_t active_clients = active_clients_.value();
int64_t cluster_total_qp_num = cluster_total_qp_num_.value();

// Request counters
int64_t exist_keys = exist_key_requests_.value();
Expand Down Expand Up @@ -754,6 +770,7 @@ std::string MasterMetricManager::get_summary_string() {
<< ((double)allocated / (double)capacity * 100.0) << "%)";
}
ss << " | Keys: " << keys << " (soft-pinned: " << soft_pin_keys << ")";
ss << " | QP: " << cluster_total_qp_num;
if (enable_ha_) {
ss << " | Clients: " << active_clients;
}
Expand Down
4 changes: 2 additions & 2 deletions mooncake-store/src/master_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ size_t MasterService::GetKeyCount() const {
return total;
}

auto MasterService::Ping(const UUID& client_id)
auto MasterService::Ping(const UUID& client_id, const size_t qp_count)
-> tl::expected<PingResponse, ErrorCode> {
if (!enable_ha_) {
LOG(ERROR) << "Ping is only available in HA mode";
Expand All @@ -589,7 +589,7 @@ auto MasterService::Ping(const UUID& client_id)
<< ", error=client_ping_queue_full";
return tl::make_unexpected(ErrorCode::INTERNAL_ERROR);
}
return PingResponse(view_version_, client_status);
return PingResponse(view_version_, client_status, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like qp_count should be passed in as a parameter here?

}

tl::expected<std::string, ErrorCode> MasterService::GetFsdir() const {
Expand Down
12 changes: 9 additions & 3 deletions mooncake-store/src/rpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,13 +491,19 @@ tl::expected<std::string, ErrorCode> WrappedMasterService::GetFsdir() {
}

tl::expected<PingResponse, ErrorCode> WrappedMasterService::Ping(
const UUID& client_id) {
const UUID& client_id, size_t qp_count) {
ScopedVLogTimer timer(1, "Ping");
timer.LogRequest("client_id=", client_id);

MasterMetricManager::instance().inc_ping_requests();

auto result = master_service_.Ping(client_id);
auto result = master_service_.Ping(client_id, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like qp_count should be passed in as a parameter here?


if (result.has_value()) {
// Increment cluster total QP number based on client's reported QP count
MasterMetricManager::instance().inc_cluster_total_qp_num(
result.value().total_qp_num);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation increments QP count on every ping operation which may cause inaccurate counting, so maybe we should maintain a map<uuid, qpcount> in master_metrics to update each client's status during pings and sum all qpcounts when queried.


timer.LogResponseExpected(result);
return result;
Expand Down Expand Up @@ -543,4 +549,4 @@ void RegisterRpcService(
&wrapped_master_service);
}

} // namespace mooncake
} // namespace mooncake
3 changes: 3 additions & 0 deletions mooncake-transfer-engine/include/transfer_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ class TransferEngine {
return local_topology_;
}

// Get the total number of QPs from RDMA transport
size_t getTotalQpNum() const;

private:
struct MemoryRegion {
void *addr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class EndpointStore {
virtual int disconnectQPs() = 0;

// Get the total number of QPs across all endpoints
virtual size_t getTotalQPNumber() = 0;
virtual size_t getTotalQpNum() = 0;
};

// FIFO
Expand All @@ -68,7 +68,7 @@ class FIFOEndpointStore : public EndpointStore {
int destroyQPs() override;
int disconnectQPs() override;

size_t getTotalQPNumber() override;
size_t getTotalQpNum() override;

private:
RWSpinlock endpoint_map_lock_;
Expand Down Expand Up @@ -99,7 +99,7 @@ class SIEVEEndpointStore : public EndpointStore {
int destroyQPs() override;
int disconnectQPs() override;

size_t getTotalQPNumber() override;
size_t getTotalQpNum() override;

private:
RWSpinlock endpoint_map_lock_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class RdmaContext {
int disconnectAllEndpoints();

// Get the total number of QPs across all endpoints in this context
size_t getTotalQPNumber() const;
size_t getTotalQpNum() const;

public:
// Device name, such as `mlx5_3`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ class RdmaTransport : public Transport {
std::string_view hint, int &buffer_id,
int &device_id, int retry_cnt = 0);

// Get the total number of QPs across all RDMA contexts
size_t getTotalQpNum() const;

private:
std::vector<std::shared_ptr<RdmaContext>> context_list_;
std::shared_ptr<Topology> local_topology_;
Expand All @@ -126,4 +129,4 @@ using BatchID = Transport::BatchID;

} // namespace mooncake

#endif // RDMA_TRANSPORT_H_
#endif // RDMA_TRANSPORT_H_
18 changes: 18 additions & 0 deletions mooncake-transfer-engine/src/transfer_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
#include <sys/resource.h>
#include <unistd.h>

#include "common.h"
#include "transfer_metadata_plugin.h"
#include "transport/transport.h"
#include "transport/rdma_transport/rdma_transport.h"

namespace mooncake {

Expand Down Expand Up @@ -435,6 +437,22 @@ int TransferEngine::unregisterLocalMemoryBatch(
return 0;
}

size_t TransferEngine::getTotalQpNum() const {
Transport *rdma_transport = multi_transports_->getTransport("rdma");
if (!rdma_transport) {
return 0;
}

// Cast to RdmaTransport to access the actual QP counting functionality
RdmaTransport *rdma = dynamic_cast<RdmaTransport *>(rdma_transport);
if (!rdma) {
LOG(ERROR) << "Failed to cast RDMA transport to RdmaTransport type";
return 0;
}

return rdma->getTotalQpNum();
}

#ifdef WITH_METRICS
// Helper function to convert string to lowercase for case-insensitive
// comparison
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ int FIFOEndpointStore::disconnectQPs() {
return 0;
}

size_t FIFOEndpointStore::getTotalQPNumber() {
size_t FIFOEndpointStore::getTotalQpNum() {
RWSpinlock::ReadGuard guard(endpoint_map_lock_);
size_t total_qps = 0;
for (const auto &kv : endpoint_map_) {
Expand Down Expand Up @@ -238,7 +238,7 @@ int SIEVEEndpointStore::disconnectQPs() {

size_t SIEVEEndpointStore::getSize() { return endpoint_map_.size(); }

size_t SIEVEEndpointStore::getTotalQPNumber() {
size_t SIEVEEndpointStore::getTotalQpNum() {
RWSpinlock::ReadGuard guard(endpoint_map_lock_);
size_t total_qps = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ int RdmaContext::deleteEndpoint(const std::string &peer_nic_path) {
return endpoint_store_->deleteEndpoint(peer_nic_path);
}

size_t RdmaContext::getTotalQPNumber() const {
return endpoint_store_->getTotalQPNumber();
size_t RdmaContext::getTotalQpNum() const {
return endpoint_store_->getTotalQpNum();
}

std::string RdmaContext::nicPath() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,14 @@ int RdmaTransport::selectDevice(SegmentDesc *desc, uint64_t offset,
return selectDevice(desc, offset, length, "", buffer_id, device_id,
retry_count);
}

size_t RdmaTransport::getTotalQpNum() const {
size_t total_qps = 0;
for (const auto &context : context_list_) {
if (context && context->active()) {
total_qps += context->getTotalQpNum();
}
}
return total_qps;
}
} // namespace mooncake
Loading