Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion src/bindings/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use bindings::{
nixl_capi_query_resp_list_get_params, nixl_capi_prep_xfer_dlist, nixl_capi_release_xfer_dlist_handle,
nixl_capi_make_xfer_req, nixl_capi_get_local_partial_md,
nixl_capi_send_local_partial_md, nixl_capi_query_xfer_backend, nixl_capi_opt_args_set_ip_addr,
nixl_capi_opt_args_set_port
nixl_capi_opt_args_set_port, nixl_capi_get_xfer_telemetry
};

// Re-export status codes
Expand All @@ -81,6 +81,7 @@ pub use bindings::{
nixl_capi_status_t_NIXL_CAPI_ERROR_INVALID_PARAM as NIXL_CAPI_ERROR_INVALID_PARAM,
nixl_capi_status_t_NIXL_CAPI_IN_PROG as NIXL_CAPI_IN_PROG,
nixl_capi_status_t_NIXL_CAPI_SUCCESS as NIXL_CAPI_SUCCESS,
nixl_capi_status_t_NIXL_CAPI_ERROR_NO_TELEMETRY as NIXL_CAPI_ERROR_NO_TELEMETRY
};

mod agent;
Expand Down Expand Up @@ -118,6 +119,8 @@ pub enum NixlError {
FailedToCreateXferDlistHandle,
#[error("Failed to create backend")]
FailedToCreateBackend,
#[error("Telemetry is not enabled or transfer is not complete")]
NoTelemetry,
}

/// A safe wrapper around NIXL memory list
Expand Down Expand Up @@ -199,6 +202,51 @@ pub struct Backend {
unsafe impl Send for Backend {}
unsafe impl Sync for Backend {}

/// Transfer telemetry data wrapper
#[derive(Debug)]
pub struct XferTelemetry {
/// Start time in microseconds since epoch
pub start_time_us: u64,
/// Post operation duration in microseconds
pub post_duration_us: u64,
/// Transfer duration in microseconds
pub xfer_duration_us: u64,
/// Total bytes transferred
pub total_bytes: u64,
/// Number of descriptors
pub desc_count: u64,
}

impl XferTelemetry {
/// Get the start time as a Duration since Unix epoch
pub fn start_time(&self) -> std::time::Duration {
std::time::Duration::from_micros(self.start_time_us)
}

/// Get the post operation duration
pub fn post_duration(&self) -> std::time::Duration {
std::time::Duration::from_micros(self.post_duration_us)
}

/// Get the transfer duration
pub fn xfer_duration(&self) -> std::time::Duration {
std::time::Duration::from_micros(self.xfer_duration_us)
}

/// Get the total duration (post + transfer)
pub fn total_duration(&self) -> std::time::Duration {
std::time::Duration::from_micros(self.post_duration_us + self.xfer_duration_us)
}

/// Calculate the transfer rate in bytes per second
pub fn transfer_rate_bps(&self) -> f64 {
if self.xfer_duration_us == 0 {
0.0
} else {
(self.total_bytes as f64) / (self.xfer_duration_us as f64 / 1_000_000.0)
}
}
}

/// A safe wrapper around NIXL optional arguments
pub struct OptArgs {
Expand Down
53 changes: 53 additions & 0 deletions src/bindings/rust/src/xfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,59 @@ impl XferRequest {
pub(crate) fn handle(&self) -> *mut bindings::nixl_capi_xfer_req_s {
self.inner.as_ptr()
}

/// Gets telemetry data for this transfer request
///
/// # Returns
/// Transfer telemetry data containing timing and performance metrics
///
/// # Errors
/// * `NoTelemetry` - If telemetry is not enabled or transfer is not complete
/// * `InvalidParam` - If the request handle is invalid
/// * `BackendError` - If there was an error retrieving telemetry data
pub fn get_telemetry(&self) -> Result<XferTelemetry, NixlError> {
tracing::trace!("Getting transfer telemetry from request");
let mut telemetry = bindings::nixl_capi_xfer_telemetry_s {
start_time_us: 0,
post_duration_us: 0,
xfer_duration_us: 0,
total_bytes: 0,
desc_count: 0,
};

let status = unsafe {
nixl_capi_get_xfer_telemetry(
self.agent.write().unwrap().handle.as_ptr(),
self.handle(),
&mut telemetry,
)
};

match status {
NIXL_CAPI_SUCCESS => {
tracing::trace!("Successfully retrieved transfer telemetry from request");
Ok(XferTelemetry {
start_time_us: telemetry.start_time_us,
post_duration_us: telemetry.post_duration_us,
xfer_duration_us: telemetry.xfer_duration_us,
total_bytes: telemetry.total_bytes,
desc_count: telemetry.desc_count,
})
},
NIXL_CAPI_IN_PROG => {
tracing::error!(error = "transfer_not_complete", "Transfer not complete");
Err(NixlError::NoTelemetry)
},
NIXL_CAPI_ERROR_NO_TELEMETRY => {
tracing::error!(error = "telemetry_not_enabled", "Telemetry not enabled");
Err(NixlError::NoTelemetry)
},
_ => {
tracing::error!(error = "backend_error", "Failed to get transfer telemetry from request");
Err(NixlError::BackendError)
}
}
}
}

// SAFETY: XferRequest can be sent between threads safely
Expand Down
217 changes: 217 additions & 0 deletions src/bindings/rust/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1368,3 +1368,220 @@ fn test_query_xfer_backend_invalid_request() {
assert!(xfer_req_result.is_err(), "Transfer request creation should fail for non-existent agent");
}
}

// Tests for get_xfer_telemetry API
#[test]
fn test_get_xfer_telemetry_success() {
env::set_var("NIXL_TELEMETRY_ENABLE", "1");

let (agent1, opt_args) = create_agent_with_backend("agent1").expect("Failed to create agent");
let (agent2, opt_args_remote) = create_agent_with_backend("agent2").expect("Failed to create agent");

let mut storage_list = create_storage_list(&agent1, &opt_args, 1);
let mut remote_storage_list = create_storage_list(&agent2, &opt_args_remote, 1);

{
let local_dlist = create_dlist(&mut storage_list).expect("Failed to create descriptor list");
let remote_dlist = create_dlist(&mut remote_storage_list).expect("Failed to create descriptor list");

exchange_metadata(&agent1, &agent2).expect("Failed to exchange metadata");

let xfer_req = agent1.create_xfer_req(
XferOp::Write,
&local_dlist,
&remote_dlist,
"agent2",
None
).expect("Failed to create transfer request");

let result = agent1.post_xfer_req(&xfer_req, Some(&opt_args));
assert!(result.is_ok(), "post_xfer_req failed with error: {:?}", result.err());

// Wait for transfer to complete
loop {
match agent1.get_xfer_status(&xfer_req) {
Ok(XferStatus::Success) => break,
Ok(XferStatus::InProgress) => {
std::thread::sleep(std::time::Duration::from_millis(10));
continue;
}
Err(e) => panic!("Failed to get transfer status: {:?}", e),
}
}

let telemetry_result = xfer_req.get_telemetry();
assert!(telemetry_result.is_ok(), "get_xfer_telemetry failed with error: {:?}", telemetry_result.err());

let telemetry = telemetry_result.unwrap();
assert!(telemetry.start_time_us > 0, "Start time should be greater than 0");
assert!(telemetry.total_bytes > 0, "Total bytes should be greater than 0");
assert!(telemetry.desc_count > 0, "Descriptor count should be greater than 0");

// Test convenience methods
let start_time = telemetry.start_time();
assert!(start_time.as_micros() == telemetry.start_time_us as u128);

let post_duration = telemetry.post_duration();
assert!(post_duration.as_micros() == telemetry.post_duration_us as u128);

let xfer_duration = telemetry.xfer_duration();
assert!(xfer_duration.as_micros() == telemetry.xfer_duration_us as u128);

let total_duration = telemetry.total_duration();
assert!(total_duration.as_micros() == (telemetry.post_duration_us + telemetry.xfer_duration_us) as u128);

// Test transfer rate calculation
let rate = telemetry.transfer_rate_bps();
if telemetry.xfer_duration_us > 0 {
assert!(rate > 0.0, "Transfer rate should be positive when transfer duration > 0");
}

println!("Telemetry data: {:?}", telemetry);
println!("Transfer rate: {:.2} MB/s", rate / 1_000_000.0);
}
}

#[test]
fn test_get_xfer_telemetry_from_request() {
env::set_var("NIXL_TELEMETRY_ENABLE", "1");

let (agent1, opt_args) = create_agent_with_backend("agent1").expect("Failed to create agent");
let (agent2, opt_args_remote) = create_agent_with_backend("agent2").expect("Failed to create agent");

let mut storage_list = create_storage_list(&agent1, &opt_args, 1);
let mut remote_storage_list = create_storage_list(&agent2, &opt_args_remote, 1);

{
let local_dlist = create_dlist(&mut storage_list).expect("Failed to create descriptor list");
let remote_dlist = create_dlist(&mut remote_storage_list).expect("Failed to create descriptor list");

exchange_metadata(&agent1, &agent2).expect("Failed to exchange metadata");

let xfer_req = agent1.create_xfer_req(
XferOp::Write,
&local_dlist,
&remote_dlist,
"agent2",
None
).expect("Failed to create transfer request");

let result = agent1.post_xfer_req(&xfer_req, Some(&opt_args));
assert!(result.is_ok(), "post_xfer_req failed with error: {:?}", result.err());

// Wait for transfer to complete
loop {
match agent1.get_xfer_status(&xfer_req) {
Ok(XferStatus::Success) => break,
Ok(XferStatus::InProgress) => {
std::thread::sleep(std::time::Duration::from_millis(10));
continue;
}
Err(e) => panic!("Failed to get transfer status: {:?}", e),
}
}

let telemetry_result = xfer_req.get_telemetry();
assert!(telemetry_result.is_ok(), "get_telemetry from request failed with error: {:?}", telemetry_result.err());

let telemetry = telemetry_result.unwrap();
assert!(telemetry.start_time_us > 0, "Start time should be greater than 0");
assert!(telemetry.total_bytes > 0, "Total bytes should be greater than 0");
assert!(telemetry.desc_count > 0, "Descriptor count should be greater than 0");

println!("Telemetry data from request: {:?}", telemetry);
}
}

#[test]
fn test_get_xfer_telemetry_without_telemetry_enabled() {
env::remove_var("NIXL_TELEMETRY_ENABLE");

let (agent1, opt_args) = create_agent_with_backend("agent1").expect("Failed to create agent");
let (agent2, opt_args_remote) = create_agent_with_backend("agent2").expect("Failed to create agent");

// Create descriptor lists
let mut storage_list = create_storage_list(&agent1, &opt_args, 1);
let mut remote_storage_list = create_storage_list(&agent2, &opt_args_remote, 1);

{
let local_dlist = create_dlist(&mut storage_list).expect("Failed to create descriptor list");
let remote_dlist = create_dlist(&mut remote_storage_list).expect("Failed to create descriptor list");

exchange_metadata(&agent1, &agent2).expect("Failed to exchange metadata");

let xfer_req = agent1.create_xfer_req(
XferOp::Write,
&local_dlist,
&remote_dlist,
"agent2",
None
).expect("Failed to create transfer request");

let result = agent1.post_xfer_req(&xfer_req, Some(&opt_args));
assert!(result.is_ok(), "post_xfer_req failed with error: {:?}", result.err());

// Wait for transfer to complete
loop {
match agent1.get_xfer_status(&xfer_req) {
Ok(XferStatus::Success) => break,
Ok(XferStatus::InProgress) => {
std::thread::sleep(std::time::Duration::from_millis(10));
continue;
}
Err(e) => panic!("Failed to get transfer status: {:?}", e),
}
}

// Try to get telemetry data - should fail with NoTelemetry
let telemetry_result = xfer_req.get_telemetry();
assert!(telemetry_result.is_err(), "get_xfer_telemetry should fail when telemetry is disabled");

match telemetry_result.err().unwrap() {
NixlError::NoTelemetry => {
println!("Correctly received NoTelemetry error");
}
other => panic!("Expected NoTelemetry error, got: {:?}", other),
}
}
}

#[test]
fn test_get_xfer_telemetry_before_posting() {
env::set_var("NIXL_TELEMETRY_ENABLE", "1");

let (agent1, opt_args) = create_agent_with_backend("agent1").expect("Failed to create agent");
let (agent2, opt_args_remote) = create_agent_with_backend("agent2").expect("Failed to create agent");

// Create descriptor lists
let mut storage_list = create_storage_list(&agent1, &opt_args, 1);
let mut remote_storage_list = create_storage_list(&agent2, &opt_args_remote, 1);

{
let local_dlist = create_dlist(&mut storage_list).expect("Failed to create descriptor list");
let remote_dlist = create_dlist(&mut remote_storage_list).expect("Failed to create descriptor list");

exchange_metadata(&agent1, &agent2).expect("Failed to exchange metadata");

// Create transfer request
let xfer_req = agent1.create_xfer_req(
XferOp::Write,
&local_dlist,
&remote_dlist,
"agent2",
None
).expect("Failed to create transfer request");

// Try to get telemetry before posting the request - should fail
let telemetry_result = xfer_req.get_telemetry();
assert!(telemetry_result.is_err(), "get_xfer_telemetry should fail before transfer is posted");
let error = telemetry_result.err().unwrap();
match error {
NixlError::NoTelemetry | NixlError::BackendError => {
println!("Got expected error before posting: {:?}", error);
}
other => panic!("Expected NoTelemetry or BackendError, got: {:?}", other),
}

println!("Successfully tested telemetry before posting - got expected error");
}
}
Loading