Skip to content

Commit 1ba8d9a

Browse files
author
youxiao
committed
add batch_put_from_multi_buffers
1 parent 69cf6ea commit 1ba8d9a

File tree

15 files changed

+855
-165
lines changed

15 files changed

+855
-165
lines changed

mooncake-integration/store/store_py.cpp

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,57 @@ PYBIND11_MODULE(store, m) {
546546
py::arg("config") = ReplicateConfig{})
547547
.def("get_hostname", [](MooncakeStorePyWrapper &self) {
548548
return self.store_->get_hostname();
549-
});
549+
})
550+
.def(
551+
"batch_put_from_multi_buffers",
552+
[](MooncakeStorePyWrapper &self,
553+
const std::vector<std::string> &keys,
554+
const std::vector<std::vector<uintptr_t>> &all_buffer_ptrs,
555+
const std::vector<std::vector<size_t>> &all_sizes,
556+
const ReplicateConfig &config = ReplicateConfig{}) {
557+
std::vector<std::vector<void *>> all_buffers;
558+
all_buffers.reserve(all_buffer_ptrs.size());
559+
for (auto &buffer_ptrs : all_buffer_ptrs) {
560+
std::vector<void *> ptrs;
561+
ptrs.reserve(buffer_ptrs.size());
562+
for (uintptr_t ptr : buffer_ptrs) {
563+
ptrs.push_back(reinterpret_cast<void *>(ptr));
564+
}
565+
all_buffers.emplace_back(std::move(ptrs));
566+
}
567+
py::gil_scoped_release release;
568+
return self.store_->batch_put_from_multi_buffers(
569+
keys, all_buffers, all_sizes, config);
570+
},
571+
py::arg("keys"), py::arg("all_buffer_ptrs"), py::arg("all_sizes"),
572+
py::arg("config") = ReplicateConfig{},
573+
"Put object data directly from multiple pre-allocated buffers for "
574+
"multiple "
575+
"keys")
576+
.def(
577+
"batch_get_into_multi_buffers",
578+
[](MooncakeStorePyWrapper &self,
579+
const std::vector<std::string> &keys,
580+
const std::vector<std::vector<uintptr_t>> &all_buffer_ptrs,
581+
const std::vector<std::vector<size_t>> &all_sizes) {
582+
std::vector<std::vector<void *>> all_buffers;
583+
all_buffers.reserve(all_buffer_ptrs.size());
584+
for (auto &buffer_ptrs : all_buffer_ptrs) {
585+
std::vector<void *> ptrs;
586+
ptrs.reserve(buffer_ptrs.size());
587+
for (uintptr_t ptr : buffer_ptrs) {
588+
ptrs.push_back(reinterpret_cast<void *>(ptr));
589+
}
590+
all_buffers.emplace_back(std::move(ptrs));
591+
}
592+
py::gil_scoped_release release;
593+
return self.store_->batch_get_into_multi_buffers(
594+
keys, all_buffers, all_sizes);
595+
},
596+
py::arg("keys"), py::arg("all_buffer_ptrs"), py::arg("all_sizes"),
597+
"Get object data directly into multiple pre-allocated buffers for "
598+
"multiple "
599+
"keys");
550600

551601
// Expose NUMA binding as a module-level function (no self required)
552602
m.def(

mooncake-integration/transfer_engine/transfer_engine_py.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,16 @@ int TransferEnginePy::initializeExt(const char *local_hostname,
112112
const char *device_name,
113113
const char *metadata_type) {
114114
(void)(protocol);
115+
if (g_transfer_engine) {
116+
engine_ = g_transfer_engine;
117+
return 0;
118+
}
115119
std::string conn_string = buildConnString(metadata_type, metadata_server);
116120

117121
auto device_name_safe = device_name ? std::string(device_name) : "";
118122
auto device_filter = buildDeviceFilter(device_name_safe);
119-
engine_ = std::make_unique<TransferEngine>(true, device_filter);
123+
engine_ = std::make_shared<TransferEngine>(true, device_filter);
124+
LOG(INFO) << "TransferEnginePy InitTransferEngine";
120125
if (getenv("MC_LEGACY_RPC_PORT_BINDING")) {
121126
auto hostname_port = parseHostNameWithPort(local_hostname);
122127
int ret =
@@ -129,6 +134,8 @@ int TransferEnginePy::initializeExt(const char *local_hostname,
129134
if (ret) return -1;
130135
}
131136

137+
g_transfer_engine = engine_;
138+
LOG(INFO) << "TransferEnginePy InitTransferEngine end: " << g_transfer_engine;
132139
free_list_.resize(kSlabSizeKBTabLen);
133140
#if !defined(USE_ASCEND) && !defined(USE_ASCEND_DIRECT) && \
134141
!defined(USE_ASCEND_HETEROGENEOUS)

mooncake-store/include/client.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ class Client {
239239
}
240240

241241
[[nodiscard]] std::string GetTransportEndpoint() {
242-
return transfer_engine_.getLocalIpAndPort();
242+
return transfer_engine_->getLocalIpAndPort();
243243
}
244244

245245
private:
@@ -300,11 +300,20 @@ class Client {
300300
std::vector<tl::expected<void, ErrorCode>> CollectResults(
301301
const std::vector<PutOperation>& ops);
302302

303+
void StartBatchPutWhenSameNodeFirst(std::vector<PutOperation>& ops,
304+
const ReplicateConfig& config);
305+
std::vector<tl::expected<void, ErrorCode>> BatchPutWhenSameNodeFirst(
306+
std::vector<PutOperation>& ops);
307+
std::vector<tl::expected<void, ErrorCode>> BatchGetWhenSameNodeFirst(
308+
const std::vector<std::string>& object_keys,
309+
const std::vector<std::vector<Replica::Descriptor>>& replica_lists,
310+
std::unordered_map<std::string, std::vector<Slice>>& slices);
311+
303312
// Client-side metrics
304313
std::unique_ptr<ClientMetric> metrics_;
305314

306315
// Core components
307-
TransferEngine transfer_engine_;
316+
std::shared_ptr<TransferEngine> transfer_engine_;
308317
MasterClient master_client_;
309318
std::unique_ptr<TransferSubmitter> transfer_submitter_;
310319

mooncake-store/include/pybind_client.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,22 @@ class PyClient {
127127
const std::vector<void *> &buffers,
128128
const std::vector<size_t> &sizes);
129129

130+
/**
131+
* @brief Put object data directly from pre-allocated buffers for key
132+
* @param keys key of the objects to put
133+
* @param buffers Vector of pointers to the pre-allocated buffers
134+
* @param sizes Vector of sizes of the buffers
135+
* @param config Replication configuration
136+
* @return Vector of integers, where each element is 0 on success, or a
137+
* negative value on error
138+
* @note The buffer addresses must be previously registered with
139+
* register_buffer() for zero-copy operations
140+
*/
141+
std::vector<int> batch_get_into_multi_buffers(
142+
const std::vector<std::string> &keys,
143+
const std::vector<std::vector<void *>> &all_buffers,
144+
const std::vector<std::vector<size_t>> &all_sizes);
145+
130146
/**
131147
* @brief Put object data directly from a pre-allocated buffer
132148
* @param key Key of the object to put
@@ -178,6 +194,24 @@ class PyClient {
178194
const std::vector<void *> &buffers, const std::vector<size_t> &sizes,
179195
const ReplicateConfig &config = ReplicateConfig{});
180196

197+
/**
198+
* @brief Put object data directly from multiple pre-allocated buffers for multiple
199+
* keys (batch version)
200+
* @param keys Vector of keys of the objects to put
201+
* @param buffers Vector of pointers to the multiple pre-allocated buffers
202+
* @param sizes Vector of sizes of the multiple buffers
203+
* @param config Replication configuration
204+
* @return Vector of integers, where each element is 0 on success, or a
205+
* negative value on error
206+
* @note The buffer addresses must be previously registered with
207+
* register_buffer() for zero-copy operations
208+
*/
209+
std::vector<int> batch_put_from_multi_buffers(
210+
const std::vector<std::string> &keys,
211+
const std::vector<std::vector<void *>> &all_buffers,
212+
const std::vector<std::vector<size_t>> &all_sizes,
213+
const ReplicateConfig &config = ReplicateConfig{});
214+
181215
int put_parts(const std::string &key,
182216
std::vector<std::span<const char>> values,
183217
const ReplicateConfig &config = ReplicateConfig{});
@@ -266,6 +300,12 @@ class PyClient {
266300
const std::vector<std::string> &keys,
267301
const std::vector<void *> &buffers, const std::vector<size_t> &sizes);
268302

303+
std::vector<tl::expected<int64_t, ErrorCode>>
304+
batch_get_into_multi_buffers_internal(
305+
const std::vector<std::string> &keys,
306+
const std::vector<std::vector<void *>> &all_buffers,
307+
const std::vector<std::vector<size_t>> &all_sizes);
308+
269309
tl::expected<void, ErrorCode> put_from_internal(
270310
const std::string &key, void *buffer, size_t size,
271311
const ReplicateConfig &config = ReplicateConfig{});
@@ -275,6 +315,13 @@ class PyClient {
275315
const std::vector<void *> &buffers, const std::vector<size_t> &sizes,
276316
const ReplicateConfig &config = ReplicateConfig{});
277317

318+
std::vector<tl::expected<void, ErrorCode>>
319+
batch_put_from_multi_buffers_internal(
320+
const std::vector<std::string> &keys,
321+
const std::vector<std::vector<void *>> &all_buffers,
322+
const std::vector<std::vector<size_t>> &all_sizes,
323+
const ReplicateConfig &config = ReplicateConfig{});
324+
278325
tl::expected<void, ErrorCode> put_parts_internal(
279326
const std::string &key, std::vector<std::span<const char>> values,
280327
const ReplicateConfig &config = ReplicateConfig{});

mooncake-store/include/transfer_task.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,11 @@ class TransferSubmitter {
371371
const Replica::Descriptor& replica, std::vector<Slice>& slices,
372372
Transport::TransferRequest::OpCode op_code);
373373

374+
std::optional<TransferFuture> submit_batch(
375+
const std::vector<Replica::Descriptor>& replicas,
376+
std::vector<std::vector<Slice>>& all_slices,
377+
Transport::TransferRequest::OpCode op_code);
378+
374379
private:
375380
TransferEngine& engine_;
376381
std::unique_ptr<MemcpyWorkerPool> memcpy_pool_;
@@ -396,7 +401,8 @@ class TransferSubmitter {
396401
*/
397402
bool validateTransferParams(
398403
const std::vector<AllocatedBuffer::Descriptor>& handles,
399-
const std::vector<Slice>& slices) const;
404+
const std::vector<Slice>& slices,
405+
bool is_multi_buffers = false) const;
400406

401407
/**
402408
* @brief Submit memcpy operation asynchronously
@@ -421,6 +427,12 @@ class TransferSubmitter {
421427
*/
422428
void updateTransferMetrics(const std::vector<Slice>& slices,
423429
Transport::TransferRequest::OpCode op);
430+
431+
std::optional<TransferFuture> submitTransferForMultiBuffers(
432+
const AllocatedBuffer::Descriptor& handle, std::vector<Slice>& slices,
433+
Transport::TransferRequest::OpCode op_code);
434+
std::optional<TransferFuture> submitTransfer(
435+
std::vector<Transport::TransferRequest>& requests);
424436
};
425437

426438
} // namespace mooncake

0 commit comments

Comments
 (0)