Skip to content

test: verify reverted code passes CI #1712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 192 additions & 16 deletions crates/core/tests/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,38 @@ async fn get_contract(

#[tokio::test(flavor = "multi_thread")]
async fn test_put_contract() -> TestResult {
let test_start = std::time::Instant::now();
tracing::info!("=== TEST START: test_put_contract ===");

freenet::config::set_logger(Some(LevelFilter::INFO), None);
const TEST_CONTRACT: &str = "test-contract-integration";

tracing::info!("Loading test contract...");
let contract_load_start = std::time::Instant::now();
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
tracing::info!(
"Contract loaded in {:?}, key: {}",
contract_load_start.elapsed(),
contract_key
);

let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);

tracing::info!("Binding network sockets...");
let network_socket_b = TcpListener::bind("127.0.0.1:0")?;
let ws_api_port_socket_a = TcpListener::bind("127.0.0.1:0")?;
let ws_api_port_socket_b = TcpListener::bind("127.0.0.1:0")?;
tracing::info!(
"Network sockets bound - B: {:?}, WS A: {:?}, WS B: {:?}",
network_socket_b.local_addr()?,
ws_api_port_socket_a.local_addr()?,
ws_api_port_socket_b.local_addr()?
);

tracing::info!("Configuring gateway node B...");
let config_start = std::time::Instant::now();
let (config_b, preset_cfg_b, config_b_gw) = {
let (cfg, preset) = base_node_test_config(
true,
Expand All @@ -153,7 +173,14 @@ async fn test_put_contract() -> TestResult {
(cfg, preset, gw_config(public_port, &path)?)
};
let ws_api_port_peer_b = config_b.ws_api.ws_api_port.unwrap();
tracing::info!(
"Gateway B configured in {:?}, port: {}",
config_start.elapsed(),
ws_api_port_peer_b
);

tracing::info!("Configuring client node A...");
let config_a_start = std::time::Instant::now();
let (config_a, preset_cfg_a) = base_node_test_config(
false,
vec![serde_json::to_string(&config_b_gw)?],
Expand All @@ -162,154 +189,280 @@ async fn test_put_contract() -> TestResult {
)
.await?;
let ws_api_port_peer_a = config_a.ws_api.ws_api_port.unwrap();
tracing::info!(
"Client A configured in {:?}, port: {}",
config_a_start.elapsed(),
ws_api_port_peer_a
);

tracing::info!("Node A data dir: {:?}", preset_cfg_b.temp_dir.path());
tracing::info!("Node B data dir: {:?}", preset_cfg_a.temp_dir.path());
tracing::info!(
"Node B (gateway) data dir: {:?}",
preset_cfg_b.temp_dir.path()
);
tracing::info!(
"Node A (client) data dir: {:?}",
preset_cfg_a.temp_dir.path()
);

std::mem::drop(ws_api_port_socket_a); // Free the port so it does not fail on initialization
let node_a = async move {
tracing::info!("Node A: Building configuration...");
let build_start = std::time::Instant::now();
let config = config_a.build().await?;
tracing::info!("Node A: Config built in {:?}", build_start.elapsed());

tracing::info!("Node A: Creating node instance...");
let node_create_start = std::time::Instant::now();
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.await?;
tracing::info!("Node A: Node created in {:?}", node_create_start.elapsed());

tracing::info!("Node A: Starting node...");
node.run().await
}
.boxed_local();

std::mem::drop(network_socket_b); // Free the port so it does not fail on initialization
std::mem::drop(ws_api_port_socket_b);
let node_b = async {
tracing::info!("Node B: Building configuration...");
let build_start = std::time::Instant::now();
let config = config_b.build().await?;
tracing::info!("Node B: Config built in {:?}", build_start.elapsed());

tracing::info!("Node B: Creating node instance...");
let node_create_start = std::time::Instant::now();
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.await?;
tracing::info!("Node B: Node created in {:?}", node_create_start.elapsed());

tracing::info!("Node B: Starting node...");
node.run().await
}
.boxed_local();

let test = tokio::time::timeout(Duration::from_secs(120), async {
let test_phase_start = std::time::Instant::now();

// Wait for nodes to start up
tracing::info!("Waiting for nodes to start up...");
tracing::info!("Test phase: Waiting for nodes to start up...");
let startup_wait_start = std::time::Instant::now();
tokio::time::sleep(Duration::from_secs(15)).await;
tracing::info!("Nodes should be ready, proceeding with test...");
tracing::info!(
"Test phase: Node startup wait completed in {:?}",
startup_wait_start.elapsed()
);

// Connect to node A's websocket API
tracing::info!("Test phase: Connecting to node A websocket...");
let ws_connect_start = std::time::Instant::now();
let uri = format!(
"ws://127.0.0.1:{ws_api_port_peer_a}/v1/contract/command?encodingProtocol=native"
);
let (stream, _) = connect_async(&uri).await?;
let mut client_api_a = WebApi::start(stream);
tracing::info!(
"Test phase: WebSocket connected in {:?}",
ws_connect_start.elapsed()
);

tracing::info!("Test phase: Making PUT request...");
let put_start = std::time::Instant::now();
make_put(
&mut client_api_a,
wrapped_state.clone(),
contract.clone(),
false,
)
.await?;
tracing::info!("Test phase: PUT request sent in {:?}", put_start.elapsed());

// Wait for put response
tracing::info!("Test phase: Waiting for PUT response...");
let put_response_start = std::time::Instant::now();
let resp = tokio::time::timeout(Duration::from_secs(60), client_api_a.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
tracing::info!(
"Test phase: PUT response received in {:?}, key: {}",
put_response_start.elapsed(),
key
);
assert_eq!(key, contract_key);
}
Ok(Ok(other)) => {
tracing::warn!("unexpected response while waiting for put: {:?}", other);
tracing::warn!("Test phase: Unexpected response while waiting for put (received in {:?}): {:?}",
put_response_start.elapsed(), other);
}
Ok(Err(e)) => {
tracing::error!(
"Test phase: Error receiving put response after {:?}: {}",
put_response_start.elapsed(),
e
);
bail!("Error receiving put response: {}", e);
}
Err(_) => {
tracing::error!(
"Test phase: Timeout waiting for put response after {:?}",
put_response_start.elapsed()
);
bail!("Timeout waiting for put response");
}
}

{
// Wait for get response from node A
tracing::info!("getting contract from A");
tracing::info!("Test phase: Getting contract from node A...");
let get_a_start = std::time::Instant::now();
let (response_contract, response_state) =
get_contract(&mut client_api_a, contract_key, &preset_cfg_b.temp_dir).await?;
let response_key = response_contract.key();
tracing::info!(
"Test phase: GET from node A completed in {:?}",
get_a_start.elapsed()
);

// Verify the responses
assert_eq!(response_key, contract_key);
assert_eq!(response_contract, contract);
assert_eq!(response_state, wrapped_state);
tracing::info!("Test phase: GET from node A verified successfully");
}

{
// Connect to node B's websocket API
tracing::info!("Test phase: Connecting to node B websocket...");
let ws_b_connect_start = std::time::Instant::now();
let uri = format!(
"ws://127.0.0.1:{ws_api_port_peer_b}/v1/contract/command?encodingProtocol=native"
);
let (stream, _) = connect_async(&uri).await?;
let mut client_api_b = WebApi::start(stream);
tracing::info!(
"Test phase: Node B WebSocket connected in {:?}",
ws_b_connect_start.elapsed()
);

// Wait for get response from node B
tracing::info!("Test phase: Getting contract from node B...");
let get_b_start = std::time::Instant::now();
let (response_contract, response_state) =
get_contract(&mut client_api_b, contract_key, &preset_cfg_b.temp_dir).await?;
let response_key = response_contract.key();
tracing::info!(
"Test phase: GET from node B completed in {:?}",
get_b_start.elapsed()
);

// Verify the responses
assert_eq!(response_key, contract_key);
assert_eq!(response_contract, contract);
assert_eq!(response_state, wrapped_state);
tracing::info!("Test phase: GET from node B verified successfully");

// Properly close the client
tracing::info!("Test phase: Disconnecting client B...");
client_api_b
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
}

// Close the first client as well
tracing::info!("Test phase: Disconnecting client A...");
client_api_a
.send(ClientRequest::Disconnect { cause: None })
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;

tracing::info!(
"Test phase: All operations completed successfully in {:?}",
test_phase_start.elapsed()
);
Ok::<_, anyhow::Error>(())
});

select! {
a = node_a => {
let Err(a) = a;
tracing::error!("Node A failed: {}", a);
let total_elapsed = test_start.elapsed();
tracing::info!("=== TEST END: test_put_contract - Total time: {:?} (Node A failed) ===", total_elapsed);
return Err(anyhow!(a).into());
}
b = node_b => {
let Err(b) = b;
tracing::error!("Node B failed: {}", b);
let total_elapsed = test_start.elapsed();
tracing::info!("=== TEST END: test_put_contract - Total time: {:?} (Node B failed) ===", total_elapsed);
return Err(anyhow!(b).into());
}
r = test => {
r??;
// Give time for cleanup before dropping nodes
tokio::time::sleep(Duration::from_secs(3)).await;
match r {
Ok(Ok(())) => {
tracing::info!("Test completed successfully!");
// Give time for cleanup before dropping nodes
tokio::time::sleep(Duration::from_secs(3)).await;
let total_elapsed = test_start.elapsed();
tracing::info!("=== TEST END: test_put_contract - Total time: {:?} (Success) ===", total_elapsed);
Ok(())
}
Ok(Err(e)) => {
tracing::error!("Test failed with error: {}", e);
let total_elapsed = test_start.elapsed();
tracing::info!("=== TEST END: test_put_contract - Total time: {:?} (Test error) ===", total_elapsed);
Err(e.into())
}
Err(_) => {
tracing::error!("Test timed out after 120 seconds");
let total_elapsed = test_start.elapsed();
tracing::info!("=== TEST END: test_put_contract - Total time: {:?} (Timeout) ===", total_elapsed);
Err(anyhow!("Test timeout").into())
}
}
}
}

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_update_contract() -> TestResult {
let test_start = std::time::Instant::now();
tracing::info!("=== TEST START: test_update_contract ===");

freenet::config::set_logger(Some(LevelFilter::INFO), None);

// Load test contract
const TEST_CONTRACT: &str = "test-contract-integration";
tracing::info!("Loading test contract...");
let contract_load_start = std::time::Instant::now();
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();
tracing::info!(
"Contract loaded in {:?}, key: {}",
contract_load_start.elapsed(),
contract_key
);

// Create initial state with empty todo list
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);

// Create network sockets
tracing::info!("Binding network sockets...");
let network_socket_b = TcpListener::bind("127.0.0.1:0")?;
let ws_api_port_socket_a = TcpListener::bind("127.0.0.1:0")?;
let ws_api_port_socket_b = TcpListener::bind("127.0.0.1:0")?;
tracing::info!(
"Network sockets bound - B: {:?}, WS A: {:?}, WS B: {:?}",
network_socket_b.local_addr()?,
ws_api_port_socket_a.local_addr()?,
ws_api_port_socket_b.local_addr()?
);

// Configure gateway node B
let (config_b, preset_cfg_b, config_b_gw) = {
Expand Down Expand Up @@ -513,20 +666,43 @@ async fn test_update_contract() -> TestResult {
select! {
a = node_a => {
let Err(a) = a;
tracing::error!("Node A failed: {}", a);
let total_elapsed = test_start.elapsed();
tracing::info!("=== TEST END: test_update_contract - Total time: {:?} (Node A failed) ===", total_elapsed);
return Err(anyhow!("Node A failed: {}", a).into());
}
b = node_b => {
let Err(b) = b;
tracing::error!("Node B failed: {}", b);
let total_elapsed = test_start.elapsed();
tracing::info!("=== TEST END: test_update_contract - Total time: {:?} (Node B failed) ===", total_elapsed);
return Err(anyhow!("Node B failed: {}", b).into());
}
r = test => {
r??;
// Keep nodes alive for pending operations to complete
tokio::time::sleep(Duration::from_secs(3)).await;
match r {
Ok(Ok(())) => {
tracing::info!("Test completed successfully!");
// Keep nodes alive for pending operations to complete
tokio::time::sleep(Duration::from_secs(3)).await;
let total_elapsed = test_start.elapsed();
tracing::info!("=== TEST END: test_update_contract - Total time: {:?} (Success) ===", total_elapsed);
Ok(())
}
Ok(Err(e)) => {
tracing::error!("Test failed with error: {}", e);
let total_elapsed = test_start.elapsed();
tracing::info!("=== TEST END: test_update_contract - Total time: {:?} (Test error) ===", total_elapsed);
Err(e.into())
}
Err(_) => {
tracing::error!("Test timed out after 120 seconds");
let total_elapsed = test_start.elapsed();
tracing::info!("=== TEST END: test_update_contract - Total time: {:?} (Timeout) ===", total_elapsed);
Err(anyhow!("Test timeout").into())
}
}
}
}

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
Expand Down
Loading
Loading