From 8d7640eda36e9b5b0f73e7b9de1f536d168797c9 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Thu, 14 Aug 2025 19:32:29 +0200 Subject: [PATCH 1/6] Fix subscribe:true flag for PUT operations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The subscribe:true flag on PUT operations was not properly subscribing the initiating peer to the contract, causing UPDATE operations to fail with "missing contract" errors. Changes: - Fixed subscription logic in PUT operation handler to properly subscribe initiator - Added contract verification before subscription request - Implemented proper local subscriber registration - Added comprehensive unit test test_put_subscribe_enables_update This fixes the River chat UPDATE failures that occurred after creating rooms. Fixes #1765 🤖 Generated with Claude Code Co-Authored-By: Claude --- crates/core/src/client_events/mod.rs | 2 + crates/core/src/operations/put.rs | 49 +++++- crates/core/tests/operations.rs | 230 +++++++++++++++++++++++++++ 3 files changed, 275 insertions(+), 6 deletions(-) diff --git a/crates/core/src/client_events/mod.rs b/crates/core/src/client_events/mod.rs index bb6e8aebd..485d1a031 100644 --- a/crates/core/src/client_events/mod.rs +++ b/crates/core/src/client_events/mod.rs @@ -376,6 +376,8 @@ async fn process_open_request( // Register subscription listener if subscribe=true if subscribe { + // Note: The actual subscription to the contract happens in the PUT operation + // when it receives SuccessfulPut. Here we just register the listener for updates. if let Some(subscription_listener) = subscription_listener { tracing::debug!(%client_id, %contract_key, "Registering subscription for PUT with auto-subscribe"); let register_listener = op_manager diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 79044f1ce..4f3520d84 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -515,18 +515,55 @@ impl Operation for PutOp { ); } - // Start subscription if the contract is already seeded and the user requested it - if subscribe && is_seeding_contract { + // Start subscription if requested - should work for both new and existing contracts + if subscribe { tracing::debug!( tx = %id, %key, peer = %op_manager.ring.connection_manager.get_peer_key().unwrap(), - "Starting subscription request" + was_already_seeding = %is_seeding_contract, + "Starting subscription for contract after successful PUT" ); - // TODO: Make put operation atomic by linking it to the completion of this subscription request. - // Currently we can't link one transaction to another transaction's result, which would be needed - // to make this fully atomic. This should be addressed in a future refactoring. + + // The contract should now be stored locally. We need to: + // 1. Verify the contract is queryable locally + // 2. Start a subscription request to register with peers + + // Verify contract is stored and queryable + let has_contract = + super::has_contract(op_manager, key).await.unwrap_or(false); + + if !has_contract { + tracing::warn!( + tx = %id, + %key, + "Contract not queryable after PUT storage, attempting subscription anyway" + ); + } + + // Start subscription request super::start_subscription_request(op_manager, key).await; + + // Also ensure we're registered as a subscriber locally + // This helps with tracking who has the contract + let own_location = + op_manager.ring.connection_manager.own_location(); + if let Err(e) = + op_manager.ring.add_subscriber(&key, own_location.clone()) + { + tracing::debug!( + tx = %id, + %key, + "Could not add self as local subscriber: {:?}", + e + ); + } else { + tracing::debug!( + tx = %id, + %key, + "Added self as local subscriber for contract" + ); + } } tracing::info!( diff --git a/crates/core/tests/operations.rs b/crates/core/tests/operations.rs index 9d2977505..acee736ff 100644 --- a/crates/core/tests/operations.rs +++ b/crates/core/tests/operations.rs @@ -1823,6 +1823,236 @@ async fn test_put_with_subscribe_flag() -> TestResult { Ok(()) } +/// Test that a client can UPDATE a contract after PUT with subscribe:true +/// This verifies the fix for issue #1765 +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_put_subscribe_enables_update() -> TestResult { + freenet::config::set_logger(Some(LevelFilter::INFO), None); + + // Load test contract + const TEST_CONTRACT: &str = "test-contract-integration"; + let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?; + let contract_key = contract.key(); + + // Create initial state + let initial_state = test_utils::create_empty_todo_list(); + let wrapped_state = WrappedState::from(initial_state); + + // Create network sockets + let network_socket_b = TcpListener::bind("127.0.0.1:0")?; + let ws_api_port_socket_a = TcpListener::bind("127.0.0.1:0")?; + let ws_api_port_socket_b = TcpListener::bind("127.0.0.1:0")?; + + // Configure gateway node B + let (config_b, _preset_cfg_b, config_b_gw) = { + let (cfg, preset) = base_node_test_config( + true, + vec![], + Some(network_socket_b.local_addr()?.port()), + ws_api_port_socket_b.local_addr()?.port(), + ) + .await?; + let public_port = cfg.network_api.public_port.unwrap(); + let path = preset.temp_dir.path().to_path_buf(); + (cfg, preset, gw_config(public_port, &path)?) + }; + + // Configure client node A + let (config_a, _preset_cfg_a) = base_node_test_config( + false, + vec![serde_json::to_string(&config_b_gw)?], + None, + ws_api_port_socket_a.local_addr()?.port(), + ) + .await?; + let ws_api_port = config_a.ws_api.ws_api_port.unwrap(); + + // Free ports + std::mem::drop(ws_api_port_socket_a); + std::mem::drop(network_socket_b); + std::mem::drop(ws_api_port_socket_b); + + // Start node A (client) + let node_a = async move { + let config = config_a.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + node.run().await + } + .boxed_local(); + + // Start node B (gateway) + let node_b = async { + let config = config_b.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + node.run().await + } + .boxed_local(); + + let test = tokio::time::timeout(Duration::from_secs(180), async { + // Wait for nodes to start up + tokio::time::sleep(Duration::from_secs(20)).await; + + // Connect to node A websocket API + let uri = + format!("ws://127.0.0.1:{ws_api_port}/v1/contract/command?encodingProtocol=native"); + let (stream, _) = connect_async(&uri).await?; + let mut client_api = WebApi::start(stream); + + // PUT contract with subscribe:true + make_put( + &mut client_api, + wrapped_state.clone(), + contract.clone(), + true, // subscribe:true - this is what we're testing + ) + .await?; + + // Wait for PUT response + tracing::info!("Waiting for PUT response with subscribe:true..."); + let resp = tokio::time::timeout(Duration::from_secs(30), client_api.recv()).await; + match resp { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => { + tracing::info!("PUT successful with subscribe:true for contract: {}", key); + assert_eq!(key, contract_key, "Contract key mismatch in PUT response"); + } + Ok(Ok(other)) => { + bail!("Unexpected response while waiting for PUT: {:?}", other); + } + Ok(Err(e)) => { + bail!("Error receiving PUT response: {}", e); + } + Err(_) => { + bail!("Timeout waiting for PUT response"); + } + } + + // Small delay to ensure subscription is established + tokio::time::sleep(Duration::from_secs(2)).await; + + // Now UPDATE the contract (this should work if subscribe:true worked correctly) + let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref()) + .unwrap_or_else(|_| test_utils::TodoList { + tasks: Vec::new(), + version: 0, + }); + + // Add a task + todo_list.tasks.push(test_utils::Task { + id: 1, + title: "Test subscribe:true fix".to_string(), + description: "Verify UPDATE works after PUT with subscribe:true".to_string(), + completed: false, + priority: 5, + }); + + let updated_bytes = serde_json::to_vec(&todo_list).unwrap(); + let updated_state = WrappedState::from(updated_bytes); + + tracing::info!("Attempting UPDATE after PUT with subscribe:true..."); + make_update(&mut client_api, contract_key, updated_state.clone()).await?; + + // Wait for UPDATE response or notification + // We might receive an UpdateNotification if we're subscribed (which means our fix works!) + let mut update_confirmed = false; + let start = std::time::Instant::now(); + + while start.elapsed() < Duration::from_secs(30) && !update_confirmed { + let resp = tokio::time::timeout(Duration::from_secs(5), client_api.recv()).await; + match resp { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse { + key, + summary: _, + }))) => { + tracing::info!("UPDATE successful after PUT with subscribe:true!"); + assert_eq!( + key, contract_key, + "Contract key mismatch in UPDATE response" + ); + update_confirmed = true; + } + Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification { + key, + update: _, + }))) => { + tracing::info!("Received UpdateNotification - this confirms we're subscribed!"); + assert_eq!( + key, contract_key, + "Contract key mismatch in UPDATE notification" + ); + // Getting a notification means we're properly subscribed - our fix is working! + update_confirmed = true; + } + Ok(Ok(other)) => { + tracing::debug!("Received other response: {:?}", other); + // Continue waiting for the update response/notification + } + Ok(Err(e)) => { + bail!("Error receiving UPDATE response: {}", e); + } + Err(_) => { + // Timeout on this iteration, continue if we haven't exceeded total time + } + } + } + + if !update_confirmed { + bail!("Did not receive UPDATE response or notification within timeout"); + } + + // Verify the state was actually updated with GET + make_get(&mut client_api, contract_key, true, false).await?; + + let resp = tokio::time::timeout(Duration::from_secs(30), client_api.recv()).await; + match resp { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { + key, + state, + contract: _, + }))) => { + assert_eq!(key, contract_key); + + // Verify the task was added + let retrieved_list: test_utils::TodoList = serde_json::from_slice(state.as_ref())?; + assert_eq!(retrieved_list.tasks.len(), 1, "Task should have been added"); + assert_eq!(retrieved_list.tasks[0].title, "Test subscribe:true fix"); + + tracing::info!( + "GET confirmed UPDATE was successful - subscribe:true fix is working!" + ); + } + _ => { + bail!("Failed to verify updated state with GET"); + } + } + + Ok::<_, anyhow::Error>(()) + }); + + // Wait for test completion or node failures + select! { + a = node_a => { + let Err(a) = a; + return Err(anyhow!("Node A failed: {}", a).into()); + } + b = node_b => { + let Err(b) = b; + return Err(anyhow!("Node B failed: {}", b).into()); + } + r = test => { + r??; + tokio::time::sleep(Duration::from_secs(3)).await; + } + } + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_delegate_request() -> TestResult { freenet::config::set_logger(Some(LevelFilter::INFO), None); From 56960a2efb615aa33fd05fbbb9d60edf2ddd8c47 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Thu, 4 Sep 2025 19:46:51 +0200 Subject: [PATCH 2/6] fix: handle client disconnection gracefully in combinator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of panicking when clients disconnect, log the error and propagate it properly. This prevents the entire client events task from crashing when WebSocket clients disconnect. This is part of the fix for test_put_with_subscribe_flag, but the test still has other issues to resolve. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- crates/core/src/client_events/combinator.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/core/src/client_events/combinator.rs b/crates/core/src/client_events/combinator.rs index 5fafa656e..bfe66f324 100644 --- a/crates/core/src/client_events/combinator.rs +++ b/crates/core/src/client_events/combinator.rs @@ -167,7 +167,9 @@ async fn client_fn( break; } Err(err) => { - panic!("Error of kind: {err} not handled"); + tracing::debug!("Client error: {err}"); + let _ = tx_host.send(Err(err)).await; + break; } } } From 60a28848c44fd3f35472c06028ea22eee7cc65b3 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Thu, 4 Sep 2025 20:03:56 +0200 Subject: [PATCH 3/6] test: attempt to fix test_put_with_subscribe_flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test was failing because WebSocket clients were disconnecting when the test closure completed, causing the client events task to exit unexpectedly. This made node.run() return an error. Attempted fixes: 1. Fixed panic in combinator.rs when clients disconnect (already committed) 2. Tried to keep clients alive by returning them from test closure The test still fails - the fundamental issue is that the node considers it an error when all clients disconnect and the client events task exits. This might actually be expected behavior that needs to be handled differently in tests. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- crates/core/tests/operations.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/core/tests/operations.rs b/crates/core/tests/operations.rs index acee736ff..6191bcb46 100644 --- a/crates/core/tests/operations.rs +++ b/crates/core/tests/operations.rs @@ -1800,7 +1800,8 @@ async fn test_put_with_subscribe_flag() -> TestResult { "Client 1 did not receive update notification within timeout period (auto-subscribe via PUT failed)" ); - Ok::<_, anyhow::Error>(()) + // Return the clients to keep them alive + Ok::<_, anyhow::Error>((client_api1, client_api2)) }); // Wait for test completion or node failures @@ -1814,7 +1815,7 @@ async fn test_put_with_subscribe_flag() -> TestResult { return Err(anyhow!("Node B failed: {}", b).into()); } r = test => { - r??; + let (_client1, _client2) = r??; // Keep clients alive // Keep nodes alive for pending operations to complete tokio::time::sleep(Duration::from_secs(3)).await; } From 61ff892b06adcb12b8491c0d3c429e8140441d0e Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 13 Sep 2025 17:48:38 +0200 Subject: [PATCH 4/6] fix: Remove problematic has_contract check causing node failures The has_contract check was causing a race condition where the contract handler might not have finished processing the PUT when the check was made, leading to node failures. The subscription should work without this verification step. --- AGENTS.md | 40 +++++++++++++++++++++++++++++++ crates/core/src/operations/put.rs | 18 +------------- 2 files changed, 41 insertions(+), 17 deletions(-) create mode 100644 AGENTS.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 000000000..220dd4887 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,40 @@ +# Repository Guidelines + +## Project Structure & Modules +- `crates/core` (crate: `freenet`): core node and the `freenet` binary. +- `crates/fdev` (crate: `fdev`): developer CLI for packaging, running, and tooling. +- `apps/*`: example apps and contracts (e.g., `apps/freenet-ping`). +- `tests/*`: integration test crates and app/contract fixtures. +- `scripts/`: local network helpers, deployment, and setup guides. +- `.github/workflows`: CI for build, test, clippy, and fmt. + +## Build, Test, and Dev +- Init submodules (required): `git submodule update --init --recursive`. +- Build all: `cargo build --workspace --locked`. +- Run core: `cargo run -p freenet --bin freenet`. +- Install binaries: `cargo install --path crates/core` and `cargo install --path crates/fdev`. +- Test (workspace): `cargo test --workspace --no-default-features --features trace,websocket,redb`. +- Example app build: `make -C apps/freenet-ping -f run-ping.mk build`. +- Optional target for contracts: `rustup target add wasm32-unknown-unknown`. + +## Coding Style & Naming +- Rust 2021, toolchain ≥ 1.80. +- Format: `cargo fmt` (CI enforces `cargo fmt -- --check`). +- Lint: `cargo clippy -- -D warnings` (no warnings in PRs). +- Naming: crates/modules `snake_case`; types/enums `PascalCase`; constants `SCREAMING_SNAKE_CASE`. +- Keep features explicit (e.g., `--no-default-features --features trace,websocket,redb`). + +## Testing Guidelines +- Unit tests in-module with `#[cfg(test)]`; integration tests under `tests/*` crates. +- Prefer meaningful coverage for changed public behavior and error paths. +- Deterministic tests only; avoid external network unless mocked. +- Run: `cargo test --workspace` before pushing. + +## Commits & Pull Requests +- Commit style: conventional prefixes (`feat:`, `fix:`, `chore:`, `refactor:`, `docs:`, `release:`). Example: `fix: prevent node crash on channel close`. +- PRs should include: clear description, rationale, linked issues, and test notes; attach logs or screenshots for app/UI changes. +- CI must pass: build, tests, `clippy`, and `fmt`. + +## Security & Configuration +- Config and secrets use platform app dirs (via `directories`). Default config/secrets are created on first run; avoid committing them. +- Review changes touching networking, keys, or persistence; prefer least-privilege defaults and explicit feature gates. diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 0d65d5b54..7370c2560 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -535,23 +535,7 @@ impl Operation for PutOp { "Starting subscription for contract after successful PUT" ); - // The contract should now be stored locally. We need to: - // 1. Verify the contract is queryable locally - // 2. Start a subscription request to register with peers - - // Verify contract is stored and queryable - let has_contract = - super::has_contract(op_manager, key).await.unwrap_or(false); - - if !has_contract { - tracing::warn!( - tx = %id, - %key, - "Contract not queryable after PUT storage, attempting subscription anyway" - ); - } - - // Start subscription request + // Start subscription request to register with peers super::start_subscription_request(op_manager, key).await; // Also ensure we're registered as a subscriber locally From 529d939fbb3290e0a9b08dd5db8a4c7226b77f34 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 14 Sep 2025 00:45:32 +0200 Subject: [PATCH 5/6] revert: Remove combinator.rs error handling change The error handling change in combinator.rs is causing nodes to shut down when they encounter errors, which is breaking the test. Reverting this change to isolate the PUT subscription fix. --- crates/core/src/client_events/combinator.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/core/src/client_events/combinator.rs b/crates/core/src/client_events/combinator.rs index 50420c05d..7c04fabba 100644 --- a/crates/core/src/client_events/combinator.rs +++ b/crates/core/src/client_events/combinator.rs @@ -169,9 +169,7 @@ async fn client_fn( break; } Err(err) => { - tracing::debug!("Client error: {err}"); - let _ = tx_host.send(Err(err)).await; - break; + panic!("Error of kind: {err} not handled"); } } } From 8a80ee605291d79d329b0bc826e7e7f66e61f63f Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 14 Sep 2025 18:31:20 +0200 Subject: [PATCH 6/6] fix: Remove problematic add_subscriber call that causes node crashes After systematic investigation, found that the add_subscriber call was causing node crashes in CI environment. The issue appears to be a race condition or lock contention when trying to add self as subscriber immediately after seeding a contract. The start_subscription_request function already handles subscription registration properly, so the explicit add_subscriber call was redundant and problematic. Testing showed: - With add_subscriber: Node crashes with 'channel closed' - Without add_subscriber: Test passes reliably - The subscription functionality still works correctly without it --- crates/core/src/operations/put.rs | 23 ++--------------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 7370c2560..2fe66c6e5 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -536,28 +536,9 @@ impl Operation for PutOp { ); // Start subscription request to register with peers + // Note: start_subscription_request already handles adding self as subscriber internally + // so we don't need to do it explicitly here super::start_subscription_request(op_manager, key).await; - - // Also ensure we're registered as a subscriber locally - // This helps with tracking who has the contract - let own_location = - op_manager.ring.connection_manager.own_location(); - if let Err(e) = - op_manager.ring.add_subscriber(&key, own_location.clone()) - { - tracing::debug!( - tx = %id, - %key, - "Could not add self as local subscriber: {:?}", - e - ); - } else { - tracing::debug!( - tx = %id, - %key, - "Added self as local subscriber for contract" - ); - } } tracing::info!(