Skip to content

Commit 8d7640e

Browse files
sanityclaude
andcommitted
Fix subscribe:true flag for PUT operations
The subscribe:true flag on PUT operations was not properly subscribing the initiating peer to the contract, causing UPDATE operations to fail with "missing contract" errors. Changes: - Fixed subscription logic in PUT operation handler to properly subscribe initiator - Added contract verification before subscription request - Implemented proper local subscriber registration - Added comprehensive unit test test_put_subscribe_enables_update This fixes the River chat UPDATE failures that occurred after creating rooms. Fixes #1765 🤖 Generated with Claude Code Co-Authored-By: Claude <[email protected]>
1 parent 45644fd commit 8d7640e

File tree

3 files changed

+275
-6
lines changed

3 files changed

+275
-6
lines changed

crates/core/src/client_events/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,8 @@ async fn process_open_request(
376376

377377
// Register subscription listener if subscribe=true
378378
if subscribe {
379+
// Note: The actual subscription to the contract happens in the PUT operation
380+
// when it receives SuccessfulPut. Here we just register the listener for updates.
379381
if let Some(subscription_listener) = subscription_listener {
380382
tracing::debug!(%client_id, %contract_key, "Registering subscription for PUT with auto-subscribe");
381383
let register_listener = op_manager

crates/core/src/operations/put.rs

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -515,18 +515,55 @@ impl Operation for PutOp {
515515
);
516516
}
517517

518-
// Start subscription if the contract is already seeded and the user requested it
519-
if subscribe && is_seeding_contract {
518+
// Start subscription if requested - should work for both new and existing contracts
519+
if subscribe {
520520
tracing::debug!(
521521
tx = %id,
522522
%key,
523523
peer = %op_manager.ring.connection_manager.get_peer_key().unwrap(),
524-
"Starting subscription request"
524+
was_already_seeding = %is_seeding_contract,
525+
"Starting subscription for contract after successful PUT"
525526
);
526-
// TODO: Make put operation atomic by linking it to the completion of this subscription request.
527-
// Currently we can't link one transaction to another transaction's result, which would be needed
528-
// to make this fully atomic. This should be addressed in a future refactoring.
527+
528+
// The contract should now be stored locally. We need to:
529+
// 1. Verify the contract is queryable locally
530+
// 2. Start a subscription request to register with peers
531+
532+
// Verify contract is stored and queryable
533+
let has_contract =
534+
super::has_contract(op_manager, key).await.unwrap_or(false);
535+
536+
if !has_contract {
537+
tracing::warn!(
538+
tx = %id,
539+
%key,
540+
"Contract not queryable after PUT storage, attempting subscription anyway"
541+
);
542+
}
543+
544+
// Start subscription request
529545
super::start_subscription_request(op_manager, key).await;
546+
547+
// Also ensure we're registered as a subscriber locally
548+
// This helps with tracking who has the contract
549+
let own_location =
550+
op_manager.ring.connection_manager.own_location();
551+
if let Err(e) =
552+
op_manager.ring.add_subscriber(&key, own_location.clone())
553+
{
554+
tracing::debug!(
555+
tx = %id,
556+
%key,
557+
"Could not add self as local subscriber: {:?}",
558+
e
559+
);
560+
} else {
561+
tracing::debug!(
562+
tx = %id,
563+
%key,
564+
"Added self as local subscriber for contract"
565+
);
566+
}
530567
}
531568

532569
tracing::info!(

crates/core/tests/operations.rs

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1823,6 +1823,236 @@ async fn test_put_with_subscribe_flag() -> TestResult {
18231823
Ok(())
18241824
}
18251825

1826+
/// Test that a client can UPDATE a contract after PUT with subscribe:true
1827+
/// This verifies the fix for issue #1765
1828+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1829+
async fn test_put_subscribe_enables_update() -> TestResult {
1830+
freenet::config::set_logger(Some(LevelFilter::INFO), None);
1831+
1832+
// Load test contract
1833+
const TEST_CONTRACT: &str = "test-contract-integration";
1834+
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
1835+
let contract_key = contract.key();
1836+
1837+
// Create initial state
1838+
let initial_state = test_utils::create_empty_todo_list();
1839+
let wrapped_state = WrappedState::from(initial_state);
1840+
1841+
// Create network sockets
1842+
let network_socket_b = TcpListener::bind("127.0.0.1:0")?;
1843+
let ws_api_port_socket_a = TcpListener::bind("127.0.0.1:0")?;
1844+
let ws_api_port_socket_b = TcpListener::bind("127.0.0.1:0")?;
1845+
1846+
// Configure gateway node B
1847+
let (config_b, _preset_cfg_b, config_b_gw) = {
1848+
let (cfg, preset) = base_node_test_config(
1849+
true,
1850+
vec![],
1851+
Some(network_socket_b.local_addr()?.port()),
1852+
ws_api_port_socket_b.local_addr()?.port(),
1853+
)
1854+
.await?;
1855+
let public_port = cfg.network_api.public_port.unwrap();
1856+
let path = preset.temp_dir.path().to_path_buf();
1857+
(cfg, preset, gw_config(public_port, &path)?)
1858+
};
1859+
1860+
// Configure client node A
1861+
let (config_a, _preset_cfg_a) = base_node_test_config(
1862+
false,
1863+
vec![serde_json::to_string(&config_b_gw)?],
1864+
None,
1865+
ws_api_port_socket_a.local_addr()?.port(),
1866+
)
1867+
.await?;
1868+
let ws_api_port = config_a.ws_api.ws_api_port.unwrap();
1869+
1870+
// Free ports
1871+
std::mem::drop(ws_api_port_socket_a);
1872+
std::mem::drop(network_socket_b);
1873+
std::mem::drop(ws_api_port_socket_b);
1874+
1875+
// Start node A (client)
1876+
let node_a = async move {
1877+
let config = config_a.build().await?;
1878+
let node = NodeConfig::new(config.clone())
1879+
.await?
1880+
.build(serve_gateway(config.ws_api).await)
1881+
.await?;
1882+
node.run().await
1883+
}
1884+
.boxed_local();
1885+
1886+
// Start node B (gateway)
1887+
let node_b = async {
1888+
let config = config_b.build().await?;
1889+
let node = NodeConfig::new(config.clone())
1890+
.await?
1891+
.build(serve_gateway(config.ws_api).await)
1892+
.await?;
1893+
node.run().await
1894+
}
1895+
.boxed_local();
1896+
1897+
let test = tokio::time::timeout(Duration::from_secs(180), async {
1898+
// Wait for nodes to start up
1899+
tokio::time::sleep(Duration::from_secs(20)).await;
1900+
1901+
// Connect to node A websocket API
1902+
let uri =
1903+
format!("ws://127.0.0.1:{ws_api_port}/v1/contract/command?encodingProtocol=native");
1904+
let (stream, _) = connect_async(&uri).await?;
1905+
let mut client_api = WebApi::start(stream);
1906+
1907+
// PUT contract with subscribe:true
1908+
make_put(
1909+
&mut client_api,
1910+
wrapped_state.clone(),
1911+
contract.clone(),
1912+
true, // subscribe:true - this is what we're testing
1913+
)
1914+
.await?;
1915+
1916+
// Wait for PUT response
1917+
tracing::info!("Waiting for PUT response with subscribe:true...");
1918+
let resp = tokio::time::timeout(Duration::from_secs(30), client_api.recv()).await;
1919+
match resp {
1920+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
1921+
tracing::info!("PUT successful with subscribe:true for contract: {}", key);
1922+
assert_eq!(key, contract_key, "Contract key mismatch in PUT response");
1923+
}
1924+
Ok(Ok(other)) => {
1925+
bail!("Unexpected response while waiting for PUT: {:?}", other);
1926+
}
1927+
Ok(Err(e)) => {
1928+
bail!("Error receiving PUT response: {}", e);
1929+
}
1930+
Err(_) => {
1931+
bail!("Timeout waiting for PUT response");
1932+
}
1933+
}
1934+
1935+
// Small delay to ensure subscription is established
1936+
tokio::time::sleep(Duration::from_secs(2)).await;
1937+
1938+
// Now UPDATE the contract (this should work if subscribe:true worked correctly)
1939+
let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref())
1940+
.unwrap_or_else(|_| test_utils::TodoList {
1941+
tasks: Vec::new(),
1942+
version: 0,
1943+
});
1944+
1945+
// Add a task
1946+
todo_list.tasks.push(test_utils::Task {
1947+
id: 1,
1948+
title: "Test subscribe:true fix".to_string(),
1949+
description: "Verify UPDATE works after PUT with subscribe:true".to_string(),
1950+
completed: false,
1951+
priority: 5,
1952+
});
1953+
1954+
let updated_bytes = serde_json::to_vec(&todo_list).unwrap();
1955+
let updated_state = WrappedState::from(updated_bytes);
1956+
1957+
tracing::info!("Attempting UPDATE after PUT with subscribe:true...");
1958+
make_update(&mut client_api, contract_key, updated_state.clone()).await?;
1959+
1960+
// Wait for UPDATE response or notification
1961+
// We might receive an UpdateNotification if we're subscribed (which means our fix works!)
1962+
let mut update_confirmed = false;
1963+
let start = std::time::Instant::now();
1964+
1965+
while start.elapsed() < Duration::from_secs(30) && !update_confirmed {
1966+
let resp = tokio::time::timeout(Duration::from_secs(5), client_api.recv()).await;
1967+
match resp {
1968+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse {
1969+
key,
1970+
summary: _,
1971+
}))) => {
1972+
tracing::info!("UPDATE successful after PUT with subscribe:true!");
1973+
assert_eq!(
1974+
key, contract_key,
1975+
"Contract key mismatch in UPDATE response"
1976+
);
1977+
update_confirmed = true;
1978+
}
1979+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification {
1980+
key,
1981+
update: _,
1982+
}))) => {
1983+
tracing::info!("Received UpdateNotification - this confirms we're subscribed!");
1984+
assert_eq!(
1985+
key, contract_key,
1986+
"Contract key mismatch in UPDATE notification"
1987+
);
1988+
// Getting a notification means we're properly subscribed - our fix is working!
1989+
update_confirmed = true;
1990+
}
1991+
Ok(Ok(other)) => {
1992+
tracing::debug!("Received other response: {:?}", other);
1993+
// Continue waiting for the update response/notification
1994+
}
1995+
Ok(Err(e)) => {
1996+
bail!("Error receiving UPDATE response: {}", e);
1997+
}
1998+
Err(_) => {
1999+
// Timeout on this iteration, continue if we haven't exceeded total time
2000+
}
2001+
}
2002+
}
2003+
2004+
if !update_confirmed {
2005+
bail!("Did not receive UPDATE response or notification within timeout");
2006+
}
2007+
2008+
// Verify the state was actually updated with GET
2009+
make_get(&mut client_api, contract_key, true, false).await?;
2010+
2011+
let resp = tokio::time::timeout(Duration::from_secs(30), client_api.recv()).await;
2012+
match resp {
2013+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
2014+
key,
2015+
state,
2016+
contract: _,
2017+
}))) => {
2018+
assert_eq!(key, contract_key);
2019+
2020+
// Verify the task was added
2021+
let retrieved_list: test_utils::TodoList = serde_json::from_slice(state.as_ref())?;
2022+
assert_eq!(retrieved_list.tasks.len(), 1, "Task should have been added");
2023+
assert_eq!(retrieved_list.tasks[0].title, "Test subscribe:true fix");
2024+
2025+
tracing::info!(
2026+
"GET confirmed UPDATE was successful - subscribe:true fix is working!"
2027+
);
2028+
}
2029+
_ => {
2030+
bail!("Failed to verify updated state with GET");
2031+
}
2032+
}
2033+
2034+
Ok::<_, anyhow::Error>(())
2035+
});
2036+
2037+
// Wait for test completion or node failures
2038+
select! {
2039+
a = node_a => {
2040+
let Err(a) = a;
2041+
return Err(anyhow!("Node A failed: {}", a).into());
2042+
}
2043+
b = node_b => {
2044+
let Err(b) = b;
2045+
return Err(anyhow!("Node B failed: {}", b).into());
2046+
}
2047+
r = test => {
2048+
r??;
2049+
tokio::time::sleep(Duration::from_secs(3)).await;
2050+
}
2051+
}
2052+
2053+
Ok(())
2054+
}
2055+
18262056
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
18272057
async fn test_delegate_request() -> TestResult {
18282058
freenet::config::set_logger(Some(LevelFilter::INFO), None);

0 commit comments

Comments
 (0)