From e0fae12ac77f936490c0226479c0062096707ff0 Mon Sep 17 00:00:00 2001 From: Stepan Kargaltsev Date: Tue, 15 Jul 2025 21:21:10 +0300 Subject: [PATCH 1/2] [TransferEngine] add location argument to python register_memory methods --- .../transfer_engine/transfer_engine_py.cpp | 23 ++++++++++++++----- .../transfer_engine/transfer_engine_py.h | 4 ++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/mooncake-integration/transfer_engine/transfer_engine_py.cpp b/mooncake-integration/transfer_engine/transfer_engine_py.cpp index 589cd9c38..ca3460f8a 100644 --- a/mooncake-integration/transfer_engine/transfer_engine_py.cpp +++ b/mooncake-integration/transfer_engine/transfer_engine_py.cpp @@ -583,14 +583,15 @@ int TransferEnginePy::transferCheckStatus(batch_id_t batch_id) { } int TransferEnginePy::batchRegisterMemory(std::vector buffer_addresses, - std::vector capacities) { + std::vector capacities, + const std::string &location) { pybind11::gil_scoped_release release; auto batch_size = buffer_addresses.size(); std::vector buffers; for (int i = 0; i < batch_size; i ++ ) { buffers.push_back(BufferEntry{(void *)buffer_addresses[i], capacities[i]}); } - return engine_->registerLocalMemoryBatch(buffers, kWildcardLocation); + return engine_->registerLocalMemoryBatch(buffers, location); } int TransferEnginePy::batchUnregisterMemory(std::vector buffer_addresses) { @@ -603,9 +604,9 @@ int TransferEnginePy::batchUnregisterMemory(std::vector buffer_addres return engine_->unregisterLocalMemoryBatch(buffers); } -int TransferEnginePy::registerMemory(uintptr_t buffer_addr, size_t capacity) { +int TransferEnginePy::registerMemory(uintptr_t buffer_addr, size_t capacity, const std::string &location) { char *buffer = reinterpret_cast(buffer_addr); - return engine_->registerLocalMemory(buffer, capacity); + return engine_->registerLocalMemory(buffer, capacity, location); } int TransferEnginePy::unregisterMemory(uintptr_t buffer_addr) { @@ -656,9 +657,19 @@ PYBIND11_MODULE(engine, m) { .def("write_bytes_to_buffer", &TransferEnginePy::writeBytesToBuffer) .def("read_bytes_from_buffer", &TransferEnginePy::readBytesFromBuffer) - .def("register_memory", &TransferEnginePy::registerMemory) + .def("register_memory", + &TransferEnginePy::registerMemory, + py::arg("buffer_addr"), + py::arg("capacity"), + py::arg("location") = kWildcardLocation + ) .def("unregister_memory", &TransferEnginePy::unregisterMemory) - .def("batch_register_memory", &TransferEnginePy::batchRegisterMemory) + .def("batch_register_memory", + &TransferEnginePy::batchRegisterMemory, + py::arg("buffer_addresses"), + py::arg("capacities"), + py::arg("location") = kWildcardLocation + ) .def("batch_unregister_memory", &TransferEnginePy::batchUnregisterMemory) .def("get_first_buffer_address", &TransferEnginePy::getFirstBufferAddress); diff --git a/mooncake-integration/transfer_engine/transfer_engine_py.h b/mooncake-integration/transfer_engine/transfer_engine_py.h index ed04696be..9f949c6ef 100644 --- a/mooncake-integration/transfer_engine/transfer_engine_py.h +++ b/mooncake-integration/transfer_engine/transfer_engine_py.h @@ -130,12 +130,12 @@ class TransferEnginePy { } // FOR EXPERIMENT ONLY - int registerMemory(uintptr_t buffer_addr, size_t capacity); + int registerMemory(uintptr_t buffer_addr, size_t capacity, const std::string &location = kWildcardLocation); // must be called before TransferEnginePy::~TransferEnginePy() int unregisterMemory(uintptr_t buffer_addr); - int batchRegisterMemory(std::vector buffer_addresses, std::vector capacities); + int batchRegisterMemory(std::vector buffer_addresses, std::vector capacities, const std::string &location = kWildcardLocation); int batchUnregisterMemory(std::vector buffer_addresses); From 91f445ec14b0e9477c82a50b058014d8ab3d8185 Mon Sep 17 00:00:00 2001 From: Stepan Kargaltsev Date: Wed, 30 Jul 2025 13:05:29 +0300 Subject: [PATCH 2/2] Add test_register_memory --- .../tests/transfer_engine_initiator_test.py | 81 +++++++++++++++---- 1 file changed, 66 insertions(+), 15 deletions(-) diff --git a/mooncake-wheel/tests/transfer_engine_initiator_test.py b/mooncake-wheel/tests/transfer_engine_initiator_test.py index eb663b346..ebf849989 100644 --- a/mooncake-wheel/tests/transfer_engine_initiator_test.py +++ b/mooncake-wheel/tests/transfer_engine_initiator_test.py @@ -1,8 +1,16 @@ import unittest +import ctypes import os +import random +import string from mooncake.engine import TransferEngine +def generate_random_string(length): + chars = string.ascii_letters + string.digits + string.punctuation + return ''.join(random.choices(chars, k=length)) + + class TestVLLMAdaptorTransfer(unittest.TestCase): @classmethod def setUpClass(cls): @@ -24,11 +32,6 @@ def setUpClass(cls): def test_random_write_circle_times(self): """Test circle times of random string write/read via buffer transfer.""" - import random, string - - def generate_random_string(length): - chars = string.ascii_letters + string.digits + string.punctuation - return ''.join(random.choices(chars, k=length)) adaptor = self.adaptor circles = self.circle @@ -70,11 +73,6 @@ def generate_random_string(length): def test_batch_write_read(self): """Test batch_transfer_sync_write and batch_transfer_sync_read for batch write/read consistency.""" - import random, string - - def generate_random_string(length): - chars = string.ascii_letters + string.digits + string.punctuation - return ''.join(random.choices(chars, k=length)) adaptor = self.adaptor batch_size = 100 # Adjust batch size if needed @@ -133,11 +131,6 @@ def generate_random_string(length): def test_async_batch_write_read(self): """Test batch_transfer_async_write and batch_transfer_async_read for batch write/read consistency.""" - import random, string - - def generate_random_string(length): - chars = string.ascii_letters + string.digits + string.punctuation - return ''.join(random.choices(chars, k=length)) adaptor = self.adaptor batch_size = 100 # Adjust batch size if needed @@ -200,5 +193,63 @@ def generate_random_string(length): print(f"[✓] {circles} rounds of batch_write_async_read passed, batch size {batch_size}.") + def run_test_register_memory(self, dst_addr, with_location): + adaptor = self.adaptor + circles = self.circle + buffer_size = 10 * 1024 + buffer = ctypes.create_string_buffer(buffer_size) + buffer_addr = ctypes.addressof(buffer) + + if with_location: + adaptor.register_memory(buffer_addr, buffer_size, "cpu") + else: + adaptor.register_memory(buffer_addr, buffer_size) + + try: + for i in range(circles): + str_len = random.randint(16, 256) + src_data = generate_random_string(str_len).encode('utf-8') + data_len = len(src_data) + offset = random.randint(0, 1024) + assert offset + data_len <= buffer_size + buffer[offset:offset + data_len] = src_data + + #Write to the remote end + result = adaptor.transfer_sync_write( + self.target_server_name, buffer_addr + offset, dst_addr, data_len + ) + self.assertEqual(result, 0, f"[{i}] WRITE transferSyncExt failed") + + #Clear the local buffer + clear_data = bytes([0] * data_len) + buffer[offset:offset + data_len] = clear_data + + #Read it back from the remote end + dst_offset = random.randint(0, 1024) + assert dst_offset + data_len <= buffer_size + + result = adaptor.transfer_sync_read( + self.target_server_name, buffer_addr + dst_offset, dst_addr, data_len + ) + self.assertEqual(result, 0, f"[{i}] READ transferSyncExt failed") + + #Verify data consistency + read_back = bytes(buffer[dst_offset:dst_offset + data_len]) + self.assertEqual(read_back, src_data, f"[{i}] Data mismatch") + + #Clear the local buffer + buffer[dst_offset:dst_offset + data_len] = clear_data + print(f"[✓] {circles} iterations of random write-read with custom buffer passed successfully ({with_location=}).") + finally: + adaptor.unregister_memory(buffer_addr) + + def test_register_memory(self): + adaptor = self.adaptor + dst_addr = adaptor.get_first_buffer_address(self.target_server_name) + + for with_location in [False, True]: + self.run_test_register_memory(dst_addr, with_location) + + if __name__ == '__main__': unittest.main()