From e019807f95614af212c4ced84165b153a97c29a6 Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Mon, 13 Oct 2025 22:27:05 -0400 Subject: [PATCH 1/5] Review issues --- node/src/dispatcher.rs | 12 +- node/src/hopper/consuming_service.rs | 10 +- node/src/hopper/mod.rs | 2 +- node/src/hopper/routing_service.rs | 44 ++++---- node/src/neighborhood/mod.rs | 18 +-- .../client_request_payload_factory.rs | 24 ++-- node/src/proxy_server/mod.rs | 105 +++++++++--------- node/src/stream_handler_pool.rs | 48 ++++---- node/src/stream_reader.rs | 18 +-- node/src/sub_lib/dispatcher.rs | 12 +- node/src/sub_lib/sequence_buffer.rs | 8 +- node/src/sub_lib/stream_handler_pool.rs | 2 +- 12 files changed, 147 insertions(+), 156 deletions(-) diff --git a/node/src/dispatcher.rs b/node/src/dispatcher.rs index e74d97e9d..4b01cd593 100644 --- a/node/src/dispatcher.rs +++ b/node/src/dispatcher.rs @@ -277,7 +277,7 @@ mod tests { timestamp: SystemTime::now(), client_addr, reception_port_opt, - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: false, is_clandestine: false, data: data.clone(), @@ -318,7 +318,7 @@ mod tests { reception_port_opt, last_data: false, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: data.clone(), }; let mut peer_actors = peer_actors_builder().hopper(hopper).build(); @@ -358,7 +358,7 @@ mod tests { reception_port_opt, last_data: false, is_clandestine: false, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: data.clone(), }; @@ -384,7 +384,7 @@ mod tests { reception_port_opt, last_data: false, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: data.clone(), }; @@ -406,7 +406,7 @@ mod tests { let obcd = TransmitDataMsg { endpoint: Endpoint::Socket(socket_addr), last_data: false, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: data.clone(), }; @@ -430,7 +430,7 @@ mod tests { let obcd = TransmitDataMsg { endpoint: Endpoint::Socket(socket_addr), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: data.clone(), }; let mut peer_actors = peer_actors_builder().build(); diff --git a/node/src/hopper/consuming_service.rs b/node/src/hopper/consuming_service.rs index dc9d2e6f1..0eb34cb52 100644 --- a/node/src/hopper/consuming_service.rs +++ b/node/src/hopper/consuming_service.rs @@ -103,7 +103,7 @@ impl ConsumingService { reception_port_opt: None, last_data: false, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: encrypted_package.into(), }; debug!( @@ -119,7 +119,7 @@ impl ConsumingService { endpoint: next_stop, last_data: false, // Hopper-to-Hopper clandestine streams are never remotely killed data: encrypted_package.into(), - sequence_number: None, + sequence_number_opt: None, }; debug!( @@ -193,7 +193,7 @@ mod tests { &TransmitDataMsg { endpoint: Endpoint::Socket(SocketAddr::from_str("1.2.1.2:1212").unwrap()), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: encodex(CRYPTDE_PAIR.main.as_ref(), &target_key, &lcp) .unwrap() .into(), @@ -267,7 +267,7 @@ mod tests { TransmitDataMsg { endpoint: Endpoint::Key(destination_key.clone()), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: expected_lcp_enc.into(), }, *record, @@ -321,7 +321,7 @@ mod tests { reception_port_opt: None, last_data: false, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: expected_lcp_enc.into(), }, ); diff --git a/node/src/hopper/mod.rs b/node/src/hopper/mod.rs index 55b0c8e24..b2f3d6dc0 100644 --- a/node/src/hopper/mod.rs +++ b/node/src/hopper/mod.rs @@ -199,7 +199,7 @@ mod tests { reception_port_opt: None, last_data: false, is_clandestine: false, - sequence_number: None, + sequence_number_opt: None, data: encrypted_package, }; let system = System::new("panics_if_routing_service_is_unbound"); diff --git a/node/src/hopper/routing_service.rs b/node/src/hopper/routing_service.rs index 3e23b1bed..69a88aed4 100644 --- a/node/src/hopper/routing_service.rs +++ b/node/src/hopper/routing_service.rs @@ -189,7 +189,7 @@ impl RoutingService { reception_port_opt: ibcd_but_data.reception_port_opt, last_data: ibcd_but_data.last_data, is_clandestine: ibcd_but_data.is_clandestine, - sequence_number: ibcd_but_data.sequence_number, + sequence_number_opt: ibcd_but_data.sequence_number_opt, data: payload.into(), }; self.routing_service_subs @@ -495,7 +495,7 @@ impl RoutingService { endpoint: Endpoint::Key(next_hop.public_key), last_data, data: next_live_package_enc.into(), - sequence_number: None, + sequence_number_opt: None, }) } } @@ -571,7 +571,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), reception_port_opt: None, - sequence_number: None, + sequence_number_opt: None, last_data: false, is_clandestine: false, data: data_enc.into(), @@ -632,7 +632,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), reception_port_opt: None, - sequence_number: None, + sequence_number_opt: None, last_data: false, is_clandestine: false, data: data_enc.into(), @@ -675,7 +675,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), reception_port_opt: None, - sequence_number: None, + sequence_number_opt: None, last_data: false, is_clandestine: false, data: data_enc.into(), @@ -727,7 +727,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), reception_port_opt: None, - sequence_number: None, + sequence_number_opt: None, last_data: false, is_clandestine: false, data: data_enc.into(), @@ -777,7 +777,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), reception_port_opt: None, - sequence_number: None, + sequence_number_opt: None, last_data: true, is_clandestine: false, data: data_enc.into(), @@ -846,7 +846,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), reception_port_opt: None, - sequence_number: None, + sequence_number_opt: None, last_data: true, is_clandestine: false, data: data_enc.into(), @@ -904,7 +904,7 @@ mod tests { reception_port_opt: None, last_data: false, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: lcp_enc.into(), }; @@ -981,7 +981,7 @@ mod tests { reception_port_opt: None, last_data: false, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: data_enc.into(), }; @@ -1054,7 +1054,7 @@ mod tests { reception_port_opt: None, last_data: false, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: data_enc.into(), }; @@ -1130,7 +1130,7 @@ mod tests { reception_port_opt: None, last_data: true, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: data_enc.into(), }; @@ -1170,7 +1170,7 @@ mod tests { TransmitDataMsg { endpoint: Endpoint::Key(next_key.clone()), last_data: true, - sequence_number: None, + sequence_number_opt: None, data: expected_lcp_enc.into(), } ); @@ -1222,7 +1222,7 @@ mod tests { reception_port_opt: None, last_data: true, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: data_enc.into(), }; @@ -1265,7 +1265,7 @@ mod tests { reception_port_opt: None, last_data: true, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: expected_lcp_enc.into() } ); @@ -1304,7 +1304,7 @@ mod tests { reception_port_opt: None, last_data: true, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: data_enc.into(), }; let system = System::new( @@ -1401,7 +1401,7 @@ mod tests { reception_port_opt: None, last_data: true, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: data_enc.into(), }; let system = System::new( @@ -1577,7 +1577,7 @@ mod tests { reception_port_opt: None, last_data: true, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: data_enc.into(), }; let system = System::new("test"); @@ -1647,7 +1647,7 @@ mod tests { reception_port_opt: None, last_data: true, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: data_enc.into(), }; let system = System::new("test"); @@ -1691,7 +1691,7 @@ mod tests { reception_port_opt: None, last_data: true, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: vec![], }; let system = System::new("consume_logs_error_when_given_bad_input_data"); @@ -1748,7 +1748,7 @@ mod tests { reception_port_opt: None, last_data: true, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: data_enc.into(), }; let system = System::new("consume_logs_error_when_given_bad_input_data"); @@ -1815,7 +1815,7 @@ mod tests { reception_port_opt: None, last_data: true, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: vec![], }; diff --git a/node/src/neighborhood/mod.rs b/node/src/neighborhood/mod.rs index 355029a2e..ed7b7a259 100644 --- a/node/src/neighborhood/mod.rs +++ b/node/src/neighborhood/mod.rs @@ -1268,10 +1268,6 @@ impl Neighborhood { UndesirabilityType::Relay => { node_record.inner.rate_pack.routing_charge(payload_size) as i64 } - UndesirabilityType::ExitRequest("booga.com") => { - node_record.inner.rate_pack.exit_charge(payload_size) as i64 - + node_record.metadata.country_undesirability as i64 - } UndesirabilityType::ExitRequest(hostname) => { let exit_undesirability = node_record.inner.rate_pack.exit_charge(payload_size) as i64; @@ -2210,7 +2206,7 @@ mod tests { use crate::sub_lib::hop::LiveHop; use crate::sub_lib::hopper::MessageType; use crate::sub_lib::neighborhood::{ - AskAboutDebutGossipMessage, ConfigChange, ConfigChangeMsg, ExpectedServices, + AskAboutDebutGossipMessage, ConfigChange, ConfigChangeMsg, NeighborhoodMode, WalletPair, }; use crate::sub_lib::neighborhood::{NeighborhoodConfig, DEFAULT_RATE_PACK}; @@ -2240,8 +2236,6 @@ mod tests { prove_that_crash_request_handler_is_hooked_up, AssertionsMessage, }; use crate::test_utils::vec_to_set; - - use super::*; use crate::accountant::test_utils::bc_from_earning_wallet; use crate::bootstrapper::CryptDEPair; use crate::neighborhood::overall_connection_status::ConnectionStageErrors::{ @@ -6624,7 +6618,7 @@ mod tests { context: TransmitDataMsg { endpoint: Endpoint::Key(cryptde.public_key().clone()), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: Vec::new(), }, recipient, @@ -6687,7 +6681,7 @@ mod tests { context: TransmitDataMsg { endpoint: Endpoint::Key(cryptde.public_key().clone()), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: Vec::new(), }, recipient, @@ -6716,7 +6710,7 @@ mod tests { let context = TransmitDataMsg { endpoint: Endpoint::Key(cryptde.public_key().clone()), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: Vec::new(), }; let context_a = context.clone(); @@ -6816,7 +6810,7 @@ mod tests { context: TransmitDataMsg { endpoint: Endpoint::Key(cryptde.public_key().clone()), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: Vec::new(), }, recipient, @@ -6844,7 +6838,7 @@ mod tests { let context = TransmitDataMsg { endpoint: Endpoint::Key(cryptde.public_key().clone()), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: Vec::new(), }; let context_a = context.clone(); diff --git a/node/src/proxy_server/client_request_payload_factory.rs b/node/src/proxy_server/client_request_payload_factory.rs index 48479312c..a68fc4bac 100644 --- a/node/src/proxy_server/client_request_payload_factory.rs +++ b/node/src/proxy_server/client_request_payload_factory.rs @@ -55,7 +55,7 @@ impl ClientRequestPayloadFactory for ClientRequestPayloadFactoryReal { } }, }; - let sequence_number = match ibcd.sequence_number { + let sequence_number = match ibcd.sequence_number_opt { Some(sequence_number) => sequence_number, None => { error!( @@ -111,7 +111,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(1), + sequence_number_opt: Some(1), last_data: false, is_clandestine: false, data: data.clone().into(), @@ -143,7 +143,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(1), + sequence_number_opt: Some(1), last_data: false, is_clandestine: false, data: data.into(), @@ -176,7 +176,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(1), + sequence_number_opt: Some(1), last_data: false, is_clandestine: false, data: data.into(), @@ -199,7 +199,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(1), + sequence_number_opt: Some(1), last_data: false, is_clandestine: false, data: data.clone().into(), @@ -236,7 +236,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(1), + sequence_number_opt: Some(1), last_data: false, is_clandestine: false, data: data.clone().into(), @@ -290,7 +290,7 @@ mod tests { let ibcd = InboundClientData { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), - sequence_number: Some(0), + sequence_number_opt: Some(0), reception_port_opt: Some(443), last_data: false, is_clandestine: false, @@ -344,7 +344,7 @@ mod tests { reception_port_opt: Some(443), last_data: true, is_clandestine: false, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: data.clone().into(), }; let cryptde = CRYPTDE_PAIR.main.as_ref(); @@ -365,7 +365,7 @@ mod tests { let ibcd = InboundClientData { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), - sequence_number: Some(0), + sequence_number_opt: Some(0), reception_port_opt: None, last_data: false, is_clandestine: false, @@ -392,7 +392,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:5678").unwrap(), reception_port_opt: Some(1234), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: false, is_clandestine: true, data: vec![0x10, 0x11, 0x12], @@ -415,7 +415,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: SocketAddr::from_str("1.2.3.4:80").unwrap(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(1), + sequence_number_opt: Some(1), last_data: false, data: data.into(), is_clandestine: false, @@ -448,7 +448,7 @@ mod tests { reception_port_opt: Some(HTTP_PORT), last_data: false, is_clandestine: false, - sequence_number: None, + sequence_number_opt: None, data: data.into(), }; let cryptde = CRYPTDE_PAIR.main.as_ref(); diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 491cb1111..5f794c921 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -317,7 +317,7 @@ impl ProxyServer { .try_send(TransmitDataMsg { endpoint: Endpoint::Socket(client_addr), last_data: true, - sequence_number: Some(0), // DNS resolution errors always happen on the first request + sequence_number_opt: Some(0), // DNS resolution errors always happen on the first request data: from_protocol(proxy_protocol) .server_impersonator() .dns_resolution_failure_response(hostname), @@ -641,7 +641,7 @@ impl ProxyServer { .try_send(TransmitDataMsg { endpoint: Endpoint::Socket(socket_addr), last_data, - sequence_number, + sequence_number_opt: Some(response.sequenced_packet.sequence_number), data: response.sequenced_packet.data, }) .expect("Dispatcher is dead"); @@ -683,7 +683,7 @@ impl ProxyServer { .try_send(TransmitDataMsg { endpoint: Endpoint::Socket(msg.client_addr), last_data: false, - sequence_number: msg.sequence_number, + sequence_number_opt: msg.sequence_number_opt, data: b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), }) .expect("Dispatcher is dead"); @@ -696,7 +696,7 @@ impl ProxyServer { .try_send(TransmitDataMsg { endpoint: Endpoint::Socket(msg.client_addr), last_data: true, - sequence_number: msg.sequence_number, + sequence_number_opt: msg.sequence_number_opt, data: b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n".to_vec(), }) .expect("Dispatcher is dead"); @@ -740,7 +740,7 @@ impl ProxyServer { reception_port_opt: Some(nca.reception_port), last_data: true, is_clandestine: false, - sequence_number: Some(nca.sequence_number), + sequence_number_opt: Some(nca.sequence_number), data: vec![], }; if let Err(e) = @@ -1014,7 +1014,7 @@ impl ProxyServer { let msg = TransmitDataMsg { endpoint: Endpoint::Socket(source_addr), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data, }; dispatcher.try_send(msg).expect("Dispatcher is dead"); @@ -1215,7 +1215,7 @@ impl IBCDHelper for IBCDHelperReal { let msg = TransmitDataMsg { endpoint: Endpoint::Socket(client_addr), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data, }; proxy_server @@ -1852,7 +1852,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr, reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data.clone(), @@ -1967,7 +1967,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(8443), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: false, is_clandestine: false, data: request_data.clone(), @@ -1976,7 +1976,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr, reception_port_opt: Some(8443), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: false, is_clandestine: false, data: tunneled_data.clone(), @@ -1984,7 +1984,7 @@ mod tests { let expected_tdm = TransmitDataMsg { endpoint: Endpoint::Socket(socket_addr), last_data: false, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), }; let expected_payload = ClientRequestPayload_0v1 { @@ -2112,7 +2112,7 @@ mod tests { reception_port_opt: Some(443), last_data: false, is_clandestine: false, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: request_data, }; @@ -2148,7 +2148,7 @@ mod tests { let dispatcher_recording = dispatcher_log_arc.lock().unwrap(); let record = dispatcher_recording.get_record::(1); - assert_eq!(record.sequence_number.unwrap(), 1); + assert_eq!(record.sequence_number_opt.unwrap(), 1); } #[test] @@ -2171,7 +2171,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(8443), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: false, is_clandestine: false, data: request_data.clone(), @@ -2212,7 +2212,7 @@ mod tests { let expected_transmit_data_msg = TransmitDataMsg { endpoint: Endpoint::Socket(socket_addr), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n".to_vec(), }; @@ -2242,7 +2242,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(8443), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: false, is_clandestine: false, data: request_data.clone(), @@ -2283,7 +2283,7 @@ mod tests { let expected_transmit_data_msg = TransmitDataMsg { endpoint: Endpoint::Socket(socket_addr), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n".to_vec(), }; @@ -2308,7 +2308,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data.clone(), @@ -2342,7 +2342,7 @@ mod tests { &TransmitDataMsg { endpoint: Endpoint::Socket(socket_addr), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: server_impersonator.consuming_wallet_absent(), } ); @@ -2366,7 +2366,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(TLS_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data.clone(), @@ -2400,7 +2400,7 @@ mod tests { &TransmitDataMsg { endpoint: Endpoint::Socket(socket_addr), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: server_impersonator.consuming_wallet_absent(), } ); @@ -2430,7 +2430,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data_inner, @@ -2514,7 +2514,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(TLS_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data_inner, @@ -2594,7 +2594,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data.clone(), @@ -2715,7 +2715,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data.clone(), @@ -2805,7 +2805,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data.clone(), @@ -2865,7 +2865,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data.clone(), @@ -2917,7 +2917,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data.clone(), @@ -2969,7 +2969,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr, reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data.clone(), @@ -3299,7 +3299,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data.clone(), @@ -3426,7 +3426,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, data: expected_data.clone(), is_clandestine: false, @@ -3462,7 +3462,7 @@ mod tests { let expected_msg = TransmitDataMsg { endpoint: Endpoint::Socket(SocketAddr::from_str("1.2.3.4:5678").unwrap()), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: ServerImpersonatorHttp {}.route_query_failure_response("nowhere.com"), }; assert_eq!(record, &expected_msg); @@ -3597,7 +3597,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, data: expected_data.clone(), is_clandestine: false, @@ -3634,7 +3634,7 @@ mod tests { let expected_msg = TransmitDataMsg { endpoint: Endpoint::Socket(SocketAddr::from_str("1.2.3.4:5678").unwrap()), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: ServerImpersonatorHttp {}.route_query_failure_response("nowhere.com"), }; assert_eq!(record, &expected_msg); @@ -3676,7 +3676,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(TLS_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: false, is_clandestine: false, data: expected_data.clone(), @@ -3761,7 +3761,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(TLS_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: false, is_clandestine: false, data: expected_data.clone(), @@ -3860,7 +3860,7 @@ mod tests { timestamp: SystemTime::now(), client_addr, reception_port_opt: Some(TLS_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data.clone(), @@ -3963,7 +3963,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(TLS_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, data: tls_request, is_clandestine: false, @@ -3997,7 +3997,7 @@ mod tests { let expected_msg = TransmitDataMsg { endpoint: Endpoint::Socket(SocketAddr::from_str("1.2.3.4:5678").unwrap()), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: ServerImpersonatorTls {}.route_query_failure_response("ignored"), }; assert_eq!(record, &expected_msg); @@ -4833,7 +4833,7 @@ mod tests { TransmitDataMsg { endpoint: Endpoint::Socket(socket_addr), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: ServerImpersonatorHttp {} .dns_resolution_failure_response("server.com".to_string()), }, @@ -5322,7 +5322,7 @@ mod tests { TransmitDataMsg { endpoint: Endpoint::Socket(socket_addr), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: ServerImpersonatorHttp {} .dns_resolution_failure_response("server.com".to_string()), }, @@ -5695,7 +5695,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(80), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: false, is_clandestine: false, data: expected_data.clone(), @@ -5761,7 +5761,6 @@ mod tests { #[test] fn handle_stream_shutdown_msg_handles_unknown_peer_addr() { let test_name = "handle_stream_shutdown_msg_handles_unknown_peer_addr"; - let logger = Logger::new(test_name); let mut subject = ProxyServer::new(CRYPTDE_PAIR.clone(), true, None, false, false); let unaffected_socket_addr = SocketAddr::from_str("2.3.4.5:6789").unwrap(); let unaffected_stream_key = StreamKey::make_meaningful_stream_key("unaffected"); @@ -6149,7 +6148,7 @@ mod tests { reception_port_opt: None, last_data: true, is_clandestine: false, - sequence_number: Some(123), + sequence_number_opt: Some(123), data: vec![], }; @@ -6287,7 +6286,7 @@ mod tests { reception_port_opt: Some(568), last_data: true, is_clandestine: false, - sequence_number: Some(123), + sequence_number_opt: Some(123), data: vec![], }; @@ -6306,7 +6305,6 @@ mod tests { #[test] fn new_http_request_creates_new_entry_inside_dns_retries_hashmap() { let test_name = "new_http_request_creates_new_entry_inside_dns_retries_hashmap"; - let logger = Logger::new(test_name); let http_request = b"GET /index.html HTTP/1.1\r\nHost: nowhere.com\r\n\r\n"; let (neighborhood_mock, _, _) = make_recorder(); let destination_key = PublicKey::from(&b"our destination"[..]); @@ -6325,7 +6323,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data.clone(), @@ -6381,7 +6379,6 @@ mod tests { fn new_http_request_creates_new_exhausted_entry_inside_dns_retries_hashmap_zero_hop() { let test_name = "new_http_request_creates_new_exhausted_entry_inside_dns_retries_hashmap_zero_hop"; - let logger = Logger::new(test_name); let http_request = b"GET /index.html HTTP/1.1\r\nHost: nowhere.com\r\n\r\n"; let (neighborhood_mock, _, _) = make_recorder(); let destination_key = PublicKey::from(&b"our destination"[..]); @@ -6400,7 +6397,7 @@ mod tests { timestamp: SystemTime::now(), client_addr: socket_addr.clone(), reception_port_opt: Some(HTTP_PORT), - sequence_number: Some(0), + sequence_number_opt: Some(0), last_data: true, is_clandestine: false, data: expected_data.clone(), @@ -6535,7 +6532,7 @@ mod tests { reception_port_opt: Some(80), last_data: true, is_clandestine: false, - sequence_number: Some(123), + sequence_number_opt: Some(123), data: expected_data, }; @@ -6570,7 +6567,7 @@ mod tests { reception_port_opt: Some(HTTP_PORT), last_data: false, is_clandestine: false, - sequence_number: Some(123), + sequence_number_opt: Some(123), data: vec![], }, &stream_key, @@ -6613,7 +6610,7 @@ mod tests { reception_port_opt: Some(HTTP_PORT), last_data: false, is_clandestine: false, - sequence_number: Some(123), + sequence_number_opt: Some(123), data: vec![], }, &stream_key, @@ -6666,7 +6663,7 @@ mod tests { reception_port_opt: Some(2222), last_data: true, is_clandestine: false, - sequence_number: Some(333), + sequence_number_opt: Some(333), data: b"GET /index.html HTTP/1.1\r\nHost: header.com:3333\r\n\r\n".to_vec(), }; @@ -6690,7 +6687,7 @@ mod tests { reception_port_opt: Some(2222), last_data: true, is_clandestine: false, - sequence_number: Some(333), + sequence_number_opt: Some(333), data: b"GET /index.html HTTP/1.1\r\nHost: header.com:4444\r\n\r\n".to_vec(), }; diff --git a/node/src/stream_handler_pool.rs b/node/src/stream_handler_pool.rs index bc362ca18..b759a703c 100644 --- a/node/src/stream_handler_pool.rs +++ b/node/src/stream_handler_pool.rs @@ -516,7 +516,7 @@ impl StreamHandlerPool { sw_key ); debug!(self.logger, "Masking {} bytes", msg.context.data.len()); - let packet = if msg.context.sequence_number.is_none() { + let packet = if msg.context.sequence_number_opt.is_none() { let masquerader = self.traffic_analyzer.get_masquerader(); match masquerader.mask(msg.context.data.as_slice()) { Ok(masked_data) => SequencedPacket::new(masked_data, 0, false), @@ -925,7 +925,7 @@ mod tests { reception_port_opt, last_data: false, is_clandestine, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: one_http_req_a, } ); @@ -939,7 +939,7 @@ mod tests { reception_port_opt, last_data: false, is_clandestine, - sequence_number: Some(1), + sequence_number_opt: Some(1), data: another_http_req_a, } ); @@ -953,7 +953,7 @@ mod tests { reception_port_opt, last_data: false, is_clandestine, - sequence_number: Some(2), + sequence_number_opt: Some(2), data: a_third_http_req_a, } ); @@ -1025,7 +1025,7 @@ mod tests { .try_send(TransmitDataMsg { endpoint: Endpoint::Socket(peer_addr), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: b"hello".to_vec(), }) .unwrap(); @@ -1107,7 +1107,7 @@ mod tests { .try_send(TransmitDataMsg { endpoint: Endpoint::Socket(peer_addr), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: vec![0x12, 0x34], }) .unwrap(); @@ -1130,7 +1130,7 @@ mod tests { .try_send(TransmitDataMsg { endpoint: Endpoint::Socket(peer_addr), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: vec![0x56, 0x78], }) .unwrap(); @@ -1209,7 +1209,7 @@ mod tests { .try_send(TransmitDataMsg { endpoint: Endpoint::Socket(peer_addr), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: vec![0x12, 0x34], }) .unwrap(); @@ -1381,7 +1381,7 @@ mod tests { context: TransmitDataMsg { endpoint: Endpoint::Key(public_key), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: b"hello".to_vec(), }, }) @@ -1470,7 +1470,7 @@ mod tests { .try_send(TransmitDataMsg { endpoint: Endpoint::Key(public_key.clone()), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: outgoing_unmasked, }) .unwrap(); @@ -1508,7 +1508,7 @@ mod tests { reception_port_opt: Some(54321), last_data: false, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: incoming_unmasked, } ); @@ -1586,7 +1586,7 @@ mod tests { .try_send(TransmitDataMsg { endpoint: Endpoint::Key(key.clone()), last_data: false, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: b"hello".to_vec(), }) .unwrap(); @@ -1646,7 +1646,7 @@ mod tests { context: TransmitDataMsg { endpoint: Endpoint::Key(key.clone()), last_data: false, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: b"hello".to_vec(), }, }) @@ -1698,7 +1698,7 @@ mod tests { context: TransmitDataMsg { endpoint: Endpoint::Key(key.clone()), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: b"hello".to_vec(), }, }) @@ -1728,7 +1728,7 @@ mod tests { let msg = TransmitDataMsg { endpoint: Endpoint::Socket(peer_addr.clone()), last_data: false, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: b"hello".to_vec(), }; let msg_a = msg.clone(); @@ -1840,7 +1840,7 @@ mod tests { context: TransmitDataMsg { endpoint: Endpoint::Socket(peer_addr.clone()), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: b"hello".to_vec(), }, }; @@ -1869,13 +1869,13 @@ mod tests { let msg = TransmitDataMsg { endpoint: Endpoint::Socket(peer_addr.clone()), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: b"hello".to_vec(), }; let msg_a = TransmitDataMsg { endpoint: Endpoint::Socket(peer_addr.clone()), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: b"worlds".to_vec(), }; let expected_data = JsonMasquerader::new().mask(&msg_a.data).unwrap(); @@ -1995,7 +1995,7 @@ mod tests { context: TransmitDataMsg { endpoint: Endpoint::Socket(peer_addr.clone()), last_data: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: b"hello".to_vec(), }, }); @@ -2035,7 +2035,7 @@ mod tests { let msg = TransmitDataMsg { endpoint: Endpoint::Socket(peer_addr.clone()), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: b"hello".to_vec(), }; @@ -2126,7 +2126,7 @@ mod tests { .try_send(TransmitDataMsg { endpoint: Endpoint::Socket(peer_addr), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: hello, }) .unwrap(); @@ -2136,7 +2136,7 @@ mod tests { .try_send(TransmitDataMsg { endpoint: Endpoint::Socket(peer_addr), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: worlds, }) .unwrap(); @@ -2200,7 +2200,7 @@ mod tests { .try_send(TransmitDataMsg { endpoint: Endpoint::Socket(peer_addr), last_data: false, - sequence_number: None, + sequence_number_opt: None, data: b"hello".to_vec(), }) .unwrap(); @@ -2249,7 +2249,7 @@ mod tests { .try_send(TransmitDataMsg { endpoint: Endpoint::Socket(local_addr), last_data: false, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: outgoing_unmasked, }) .unwrap(); diff --git a/node/src/stream_reader.rs b/node/src/stream_reader.rs index 0cab54033..df8dd8f33 100644 --- a/node/src/stream_reader.rs +++ b/node/src/stream_reader.rs @@ -137,7 +137,7 @@ impl StreamReaderReal { // handshake and should start the sequence at Some(0) as well, the ProxyServer will // handle the sequenced packet offset before sending them through the stream_writer // and avoid dropping duplicate packets. - let sequence_number = if unmasked_chunk.sequenced && !is_connect { + let sequence_number_opt = if unmasked_chunk.sequenced && !is_connect { Some(self.sequencer.next_sequence_number()) } else if is_connect { // This case needs to explicitly be Some(0) instead of None so that the StreamHandlerPool does @@ -146,7 +146,7 @@ impl StreamReaderReal { } else { None }; - match sequence_number { + match sequence_number_opt { Some(num) => debug!( self.logger, "Read {} bytes of clear data (#{})", @@ -165,7 +165,7 @@ impl StreamReaderReal { reception_port_opt: self.reception_port_opt, last_data: false, is_clandestine: self.is_clandestine, - sequence_number, + sequence_number_opt, data: unmasked_chunk.chunk.clone(), }; debug!(self.logger, "Discriminator framed and unmasked {} bytes for {}; transmitting via Hopper", @@ -514,7 +514,7 @@ mod tests { reception_port_opt: Some(1234 as u16), last_data: false, is_clandestine: true, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: Vec::from("GET http://here.com HTTP/1.1\r\n\r\n".as_bytes()), } ); @@ -575,13 +575,13 @@ mod tests { Some(0), d_recording .get_record::(0) - .sequence_number, + .sequence_number_opt, ); assert_eq!( Some(0), d_recording .get_record::(1) - .sequence_number, + .sequence_number_opt, ); } @@ -636,7 +636,7 @@ mod tests { reception_port_opt: Some(1234 as u16), last_data: false, is_clandestine: false, - sequence_number: Some(0), + sequence_number_opt: Some(0), data: Vec::from("GET http://here.com HTTP/1.1\r\n\r\n".as_bytes()), } ); @@ -651,7 +651,7 @@ mod tests { reception_port_opt: Some(1234 as u16), last_data: false, is_clandestine: false, - sequence_number: Some(1), + sequence_number_opt: Some(1), data: Vec::from("GET http://www.example.com HTTP/1.1\r\n\r\n".as_bytes()), } ); @@ -710,7 +710,7 @@ mod tests { reception_port_opt: Some(1234 as u16), last_data: false, is_clandestine: true, - sequence_number: None, + sequence_number_opt: None, data: Vec::from("GET http://here.com HTTP/1.1\r\n\r\n".as_bytes()), } ); diff --git a/node/src/sub_lib/dispatcher.rs b/node/src/sub_lib/dispatcher.rs index 6ad7678b1..e04401220 100644 --- a/node/src/sub_lib/dispatcher.rs +++ b/node/src/sub_lib/dispatcher.rs @@ -119,7 +119,7 @@ pub struct InboundClientData { pub reception_port_opt: Option, pub last_data: bool, pub is_clandestine: bool, - pub sequence_number: Option, + pub sequence_number_opt: Option, pub data: Vec, } @@ -130,7 +130,7 @@ impl Debug for InboundClientData { Err(_) => self.data.hex_dump().to_string(), }; write!(f, "InboundClientData {{ peer_addr: {:?}, reception_port: {:?}, last_data: {}, sequence_number: {:?}, {} bytes of data: {} }}", - self.client_addr, self.reception_port_opt, self.last_data, self.sequence_number, self.data.len(), data_string) + self.client_addr, self.reception_port_opt, self.last_data, self.sequence_number_opt, self.data.len(), data_string) } } @@ -142,7 +142,7 @@ impl InboundClientData { reception_port_opt: self.reception_port_opt, last_data: self.last_data, is_clandestine: self.is_clandestine, - sequence_number: self.sequence_number, + sequence_number_opt: self.sequence_number_opt, data: vec![], } } @@ -277,7 +277,7 @@ mod tests { reception_port_opt: None, last_data: false, is_clandestine: false, - sequence_number: None, + sequence_number_opt: None, data: b"CONNECT server.example.com:80 HTTP/1.1\r\nHost: server.example.com:80\r\nProxy-Authorization: basic aGVsbG86d29ybGQ=\r\n\r\n".to_vec(), }; @@ -292,7 +292,7 @@ mod tests { reception_port_opt: None, last_data: false, is_clandestine: false, - sequence_number: None, + sequence_number_opt: None, data: b"GET server.example.com:80 HTTP/1.1\r\nHost: server.example.com:80\r\nProxy-Authorization: basic aGVsbG86d29ybGQ=\r\n\r\n".to_vec(), }; @@ -307,7 +307,7 @@ mod tests { reception_port_opt: None, last_data: false, is_clandestine: false, - sequence_number: None, + sequence_number_opt: None, data: b"CONNECTX".to_vec(), }; diff --git a/node/src/sub_lib/sequence_buffer.rs b/node/src/sub_lib/sequence_buffer.rs index c73dbc021..2cb101c41 100644 --- a/node/src/sub_lib/sequence_buffer.rs +++ b/node/src/sub_lib/sequence_buffer.rs @@ -38,7 +38,7 @@ impl<'a> From<&'a TransmitDataMsg> for SequencedPacket { fn from(tdm: &'a TransmitDataMsg) -> Self { SequencedPacket::new( tdm.data.clone(), - tdm.sequence_number.unwrap_or(0), + tdm.sequence_number_opt.unwrap_or(0), tdm.last_data, ) } @@ -253,7 +253,7 @@ mod tests { endpoint: Endpoint::Socket(SocketAddr::from_str("1.2.3.4:80").unwrap()), last_data: true, data: vec![1, 4, 5, 9], - sequence_number: None, + sequence_number_opt: None, }; let result = SequencedPacket::from(&tdm); @@ -267,7 +267,7 @@ mod tests { endpoint: Endpoint::Socket(SocketAddr::from_str("1.2.3.4:80").unwrap()), last_data: true, data: vec![1, 4, 5, 9], - sequence_number: Some(1), + sequence_number_opt: Some(1), }; let result = SequencedPacket::from(&tdm); @@ -280,7 +280,7 @@ mod tests { endpoint: Endpoint::Socket(SocketAddr::from_str("1.2.3.4:80").unwrap()), last_data: false, data: vec![4, 2, 5, 67], - sequence_number: Some(4), + sequence_number_opt: Some(4), }; let result = SequencedPacket::from(&tdm); diff --git a/node/src/sub_lib/stream_handler_pool.rs b/node/src/sub_lib/stream_handler_pool.rs index fcb0f2e64..92ade9931 100644 --- a/node/src/sub_lib/stream_handler_pool.rs +++ b/node/src/sub_lib/stream_handler_pool.rs @@ -8,7 +8,7 @@ use actix::Message; pub struct TransmitDataMsg { pub endpoint: Endpoint, pub last_data: bool, - pub sequence_number: Option, // Some implies clear data; None implies clandestine. + pub sequence_number_opt: Option, // Some implies clear data; None implies clandestine. pub data: Vec, } From c5ba6696d4af745ab1cbc1cd94601d016b3feb64 Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Wed, 15 Oct 2025 22:59:43 -0400 Subject: [PATCH 2/5] Test is passing now, and code is cleaner --- node/src/proxy_server/mod.rs | 115 ++++++++++++++++++----------------- 1 file changed, 58 insertions(+), 57 deletions(-) diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 5f794c921..e0c5736c2 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -341,6 +341,10 @@ impl ProxyServer { } fn handle_add_route_result_message(&mut self, msg: AddRouteResultMessage) { + // We can't access self.logger for logging once we obtain mutable access to a stream_info + // element. So we create a delayed_log closure that we can call with self.logger after + // we've finished with the mutable borrow. We have to use #[allow(unused_assignments)] + // because Rust can't figure out that delayed_log will always be assigned before it's used. type DelayedLogArgs = Box; #[allow(unused_assignments)] let mut delayed_log: DelayedLogArgs = Box::new(|_, _, _, _, _| {}); @@ -538,6 +542,9 @@ impl ProxyServer { fn schedule_stream_key_purge(&mut self, stream_key: StreamKey) { let stream_key_purge_delay = self.stream_key_purge_delay; + // We can't access self.logger for logging once we obtain mutable access to a stream_info + // element. So we create a delayed_log closure that we can call with self.logger after + // we've finished with the mutable borrow. let mut delayed_log: Box = Box::new(|_: &Logger| {}); if let Some(stream_info) = self.stream_info_mut(&stream_key) { let host_info = match &stream_info.tunneled_host_opt { @@ -606,62 +613,55 @@ impl ProxyServer { payload_data_len, ); let stream_key = response.stream_key; - let old_timestamp_opt = match self.stream_info_mut(&stream_key) { - Some(info) => { - let time_to_live_opt = info.time_to_live_opt; - // This call to remove_dns_failure_retry is here only because it needs access to - // a mutable StreamInfo, and we have one handy here. It has nothing to do with - // timestamps. - if let Err(e) = ProxyServer::remove_dns_failure_retry(info, &stream_key) { - trace!( - self.logger, - "No DNS retry entry found for stream key {} during a successful attempt: {}", - &stream_key, e - ); - } - time_to_live_opt + if let Some(info) = self.stream_info_mut(&stream_key) { + if let Err(e) = ProxyServer::remove_dns_failure_retry(info, &stream_key) { + trace!( + self.logger, + "No DNS retry entry found for stream key {} during a successful attempt: {}", + &stream_key, e + ); } - None => None, - }; - if let Some(old_timestamp) = old_timestamp_opt { - self.log_straggling_packet(&stream_key, payload_data_len, &old_timestamp) - // TODO: Make sure we actually do something (other than logging) about stragglers. - } else { - match self.keys_and_addrs.a_to_b(&stream_key) { - Some(socket_addr) => { - let last_data = response.sequenced_packet.last_data; - let sequence_number = Some( - response.sequenced_packet.sequence_number - + self.browser_proxy_sequence_offset as u64, - ); - self.subs - .as_ref() - .expect("Dispatcher unbound in ProxyServer") - .dispatcher - .try_send(TransmitDataMsg { - endpoint: Endpoint::Socket(socket_addr), - last_data, - sequence_number_opt: Some(response.sequenced_packet.sequence_number), - data: response.sequenced_packet.data, - }) - .expect("Dispatcher is dead"); - if last_data { - self.purge_stream_key(&stream_key, "last data received from the exit node"); + } + if let Some(info) = self.stream_info(&stream_key) { + if let Some(old_timestamp) = info.time_to_live_opt { + self.log_straggling_packet(&stream_key, payload_data_len, &old_timestamp) + } else { + match self.keys_and_addrs.a_to_b(&stream_key) { + Some(socket_addr) => { + let last_data = response.sequenced_packet.last_data; + let sequence_number_opt = Some( + response.sequenced_packet.sequence_number + + self.browser_proxy_sequence_offset as u64, + ); + self.subs + .as_ref() + .expect("Dispatcher unbound in ProxyServer") + .dispatcher + .try_send(TransmitDataMsg { + endpoint: Endpoint::Socket(socket_addr), + last_data, + sequence_number_opt, + data: response.sequenced_packet.data, + }) + .expect("Dispatcher is dead"); + if last_data { + self.purge_stream_key(&stream_key, "last data received from the exit node"); + } + } + None => { + // TODO GH-608: It would be really nice to be able to send an InboundClientData with last_data: true + // back to the ProxyClient (and the distant server) so that the server could shut down + // its stream, since the browser has shut down _its_ stream and no more data will + // ever be accepted from the server on that stream; but we don't have enough information + // to do so, since our stream key has been purged and all the information it keyed + // is gone. Sorry, server! + warning!(self.logger, + "Discarding {}-byte packet {} from an unrecognized stream key: {:?}; can't send response back to client", + response.sequenced_packet.data.len(), + response.sequenced_packet.sequence_number, + response.stream_key, + ) } - } - None => { - // TODO GH-608: It would be really nice to be able to send an InboundClientData with last_data: true - // back to the ProxyClient (and the distant server) so that the server could shut down - // its stream, since the browser has shut down _its_ stream and no more data will - // ever be accepted from the server on that stream; but we don't have enough information - // to do so, since our stream key has been purged and all the information it keyed - // is gone. Sorry, server! - warning!(self.logger, - "Discarding {}-byte packet {} from an unrecognized stream key: {:?}; can't send response back to client", - response.sequenced_packet.data.len(), - response.sequenced_packet.sequence_number, - response.stream_key, - ) } } } @@ -2099,7 +2099,7 @@ mod tests { vec![], vec![ExpectedService::Nothing], ), - host: Host::new("booga.com", HTTP_PORT), + host: Host::new("booga.com", TLS_PORT), }) .build(), ); @@ -2109,7 +2109,7 @@ mod tests { let inbound_client_data = InboundClientData { timestamp: SystemTime::now(), client_addr: socket_addr, - reception_port_opt: Some(443), + reception_port_opt: Some(TLS_PORT), last_data: false, is_clandestine: false, sequence_number_opt: Some(0), @@ -2146,8 +2146,9 @@ mod tests { system.run(); let dispatcher_recording = dispatcher_log_arc.lock().unwrap(); + let record = dispatcher_recording.get_record::(0); + assert_eq!(record.sequence_number_opt.unwrap(), 0); let record = dispatcher_recording.get_record::(1); - assert_eq!(record.sequence_number_opt.unwrap(), 1); } From 05adecf4ed96b8270fedb5076ab5ec06defc3634 Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Wed, 15 Oct 2025 23:20:22 -0400 Subject: [PATCH 3/5] Cleaned up some code --- node/src/proxy_server/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index e0c5736c2..8e3583067 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -430,8 +430,8 @@ impl ProxyServer { // circumstances we want to _retire_ the stream key; so we have a restore_stream_info // flag that starts out true and is set to false if we retire the stream key. It's an // ugly hack. Thanks, Borrow Checker! - let mut stream_info = match self.stream_info(&response.stream_key) { - Some(info) => (*info).clone(), + let mut stream_info = match self.stream_info.remove(&response.stream_key) { + Some(info) => info, None => { error!( self.logger, From 723c8b0b85d423fb47dc10765f73b5e0b57bb630 Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Mon, 20 Oct 2025 21:32:50 -0400 Subject: [PATCH 4/5] More review issues --- node/src/neighborhood/mod.rs | 26 +-- .../client_request_payload_factory.rs | 20 +- node/src/proxy_server/mod.rs | 178 ++++++++---------- node/src/sub_lib/stream_key.rs | 9 + node/src/sub_lib/ttl_hashmap.rs | 2 +- 5 files changed, 112 insertions(+), 123 deletions(-) diff --git a/node/src/neighborhood/mod.rs b/node/src/neighborhood/mod.rs index ed7b7a259..c512384f2 100644 --- a/node/src/neighborhood/mod.rs +++ b/node/src/neighborhood/mod.rs @@ -30,7 +30,7 @@ use crate::sub_lib::hopper::{IncipientCoresPackage, MessageType}; use crate::sub_lib::host::Host; use crate::sub_lib::neighborhood::ConnectionProgressEvent; use crate::sub_lib::neighborhood::ExpectedService::{Exit, Nothing, Routing}; -use crate::sub_lib::neighborhood::ExpectedServices::{OneWay, RoundTrip}; +use crate::sub_lib::neighborhood::ExpectedServices::RoundTrip; use crate::sub_lib::neighborhood::RouteQueryResponse; use crate::sub_lib::neighborhood::UpdateNodeRecordMetadataMessage; use crate::sub_lib::neighborhood::{AskAboutDebutGossipMessage, NodeDescriptor}; @@ -2195,19 +2195,27 @@ mod tests { use std::time::Instant; use tokio::prelude::Future; + use crate::accountant::test_utils::bc_from_earning_wallet; + use crate::bootstrapper::CryptDEPair; use crate::db_config::persistent_configuration::PersistentConfigError; use crate::neighborhood::gossip::Gossip_0v1; use crate::neighborhood::gossip::{GossipBuilder, GossipNodeRecord}; use crate::neighborhood::node_record::{NodeRecordInner_0v1, NodeRecordInputs}; + use crate::neighborhood::overall_connection_status::ConnectionStageErrors::{ + NoGossipResponseReceived, PassLoopFound, TcpConnectionFailed, + }; + use crate::neighborhood::overall_connection_status::{ + ConnectionProgress, ConnectionStage, OverallConnectionStage, + }; use crate::stream_messages::{NonClandestineAttributes, RemovedStreamType}; use crate::sub_lib::cryptde::{decodex, encodex, CryptData, PlainData}; use crate::sub_lib::cryptde_null::CryptDENull; use crate::sub_lib::dispatcher::Endpoint; use crate::sub_lib::hop::LiveHop; use crate::sub_lib::hopper::MessageType; + use crate::sub_lib::host::Host; use crate::sub_lib::neighborhood::{ - AskAboutDebutGossipMessage, ConfigChange, ConfigChangeMsg, - NeighborhoodMode, WalletPair, + AskAboutDebutGossipMessage, ConfigChange, ConfigChangeMsg, NeighborhoodMode, WalletPair, }; use crate::sub_lib::neighborhood::{NeighborhoodConfig, DEFAULT_RATE_PACK}; use crate::sub_lib::neighborhood::{NeighborhoodMetadata, RatePack}; @@ -2230,23 +2238,15 @@ mod tests { use crate::test_utils::recorder::peer_actors_builder; use crate::test_utils::recorder::Recorder; use crate::test_utils::recorder::Recording; + use crate::test_utils::unshared_test_utils::notify_handlers::NotifyLaterHandleMock; use crate::test_utils::unshared_test_utils::{ assert_on_initialization_with_panic_on_migration, make_cpm_recipient, make_node_to_ui_recipient, make_recipient_and_recording_arc, prove_that_crash_request_handler_is_hooked_up, AssertionsMessage, }; use crate::test_utils::vec_to_set; - use crate::accountant::test_utils::bc_from_earning_wallet; - use crate::bootstrapper::CryptDEPair; - use crate::neighborhood::overall_connection_status::ConnectionStageErrors::{ - NoGossipResponseReceived, PassLoopFound, TcpConnectionFailed, - }; - use crate::neighborhood::overall_connection_status::{ - ConnectionProgress, ConnectionStage, OverallConnectionStage, - }; - use crate::sub_lib::host::Host; - use crate::test_utils::unshared_test_utils::notify_handlers::NotifyLaterHandleMock; use masq_lib::test_utils::logging::{init_test_logging, TestLogHandler}; + use crate::sub_lib::neighborhood::ExpectedServices::OneWay; lazy_static! { static ref CRYPTDE_PAIR: CryptDEPair = CryptDEPair::null(); diff --git a/node/src/proxy_server/client_request_payload_factory.rs b/node/src/proxy_server/client_request_payload_factory.rs index a68fc4bac..3bebeb7ea 100644 --- a/node/src/proxy_server/client_request_payload_factory.rs +++ b/node/src/proxy_server/client_request_payload_factory.rs @@ -28,12 +28,12 @@ impl ClientRequestPayloadFactory for ClientRequestPayloadFactoryReal { &self, ibcd: &InboundClientData, stream_key: StreamKey, - host_opt: Option, + host_from_history_opt: Option, cryptde: &dyn CryptDE, logger: &Logger, ) -> Option { let protocol_pack = from_ibcd(ibcd).map_err(|e| error!(logger, "{}", e)).ok()?; - let host_from_ibcd = Box::new(|| { + let host_from_request_result_closure = Box::new(|| { let data = PlainData::new(&ibcd.data); match protocol_pack.find_host(&data) { Some(host) => Ok(host), @@ -45,15 +45,13 @@ impl ClientRequestPayloadFactory for ClientRequestPayloadFactoryReal { )), } }); - let target_host: Host = match host_from_ibcd() { - Ok(host) => host, - Err(e) => match host_opt { - Some(host) => host, - None => { - error!(logger, "{}", e); - return None; - } - }, + let target_host = match (host_from_request_result_closure(), host_from_history_opt) { + (Ok(host), _) => host, + (Err(_), Some(host)) => host, + (Err(e), None) => { + error!(logger, "{}", e); + return None; + } }; let sequence_number = match ibcd.sequence_number_opt { Some(sequence_number) => sequence_number, diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 8e3583067..68d7b1ebc 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -129,7 +129,8 @@ impl Handler for ProxyServer { self.tls_connect(&msg); self.browser_proxy_sequence_offset = true; } else if let Err(e) = - self.help(|helper, proxy| helper.handle_normal_client_data(proxy, msg, false)) + // NOTE: I removed a 'false' parameter here for retire_stream_key because I think it was wrong. + self.help(|helper, proxy| helper.handle_normal_client_data(proxy, msg)) { error!(self.logger, "{}", e) } @@ -618,7 +619,8 @@ impl ProxyServer { trace!( self.logger, "No DNS retry entry found for stream key {} during a successful attempt: {}", - &stream_key, e + &stream_key, + e ); } } @@ -645,7 +647,10 @@ impl ProxyServer { }) .expect("Dispatcher is dead"); if last_data { - self.purge_stream_key(&stream_key, "last data received from the exit node"); + self.purge_stream_key( + &stream_key, + "last data received from the exit node", + ); } } None => { @@ -743,8 +748,7 @@ impl ProxyServer { sequence_number_opt: Some(nca.sequence_number), data: vec![], }; - if let Err(e) = - self.help(|helper, proxy| helper.handle_normal_client_data(proxy, ibcd, true)) + if let Err(e) = self.help(|helper, proxy| helper.handle_normal_client_data(proxy, ibcd)) { error!(self.logger, "{}", e) }; @@ -813,7 +817,7 @@ impl ProxyServer { }; let new_ibcd = match tunnelled_host_opt { Some(_) => InboundClientData { - reception_port_opt: Some(443), + reception_port_opt: Some(TLS_PORT), ..ibcd }, None => ibcd, @@ -1028,13 +1032,13 @@ impl ProxyServer { None => { error!(self.logger, "Can't pay for return services consumed: received response with unrecognized stream key {:?}. Ignoring", stream_key); None - }, - Some(stream_info) => match &stream_info.route_opt { - None => { - error!(self.logger, "Can't pay for return services consumed: stream_info contains no route for stream key {:?}", stream_key); - None - }, - Some(route) => match route.expected_services { + } + Some(stream_info) => { + let route = stream_info + .route_opt + .as_ref() + .unwrap_or_else(|| panic!("Internal error: Request was sent over stream {:?} without an associated route being stored in stream_info: can't pay", stream_key)); + match route.expected_services { ExpectedServices::RoundTrip(_, ref return_services) => Some(return_services.clone()), _ => panic!("Internal error: ExpectedServices in ProxyServer for stream key {:?} is not RoundTrip", stream_key), } @@ -1113,15 +1117,6 @@ pub trait IBCDHelper { &self, proxy_s: &mut ProxyServer, msg: InboundClientData, - // TODO: I was wondering: could we just use msg.is_last_data here, instead of a whole different - // parameter? Then I thought no, they're not the same: is_last_data means the client won't - // send any more data, but we don't want to retire_stream_key until we've waited for any - // straggling responses from the server, so that we can pay for them. However, it turns out - // that if there is any such delay for straggling data, it's doesn't have anything to do with - // retire_stream_key, because retire_stream_key is always equal to msg.is_last_data. So we - // _could_ remove retire_stream_key and use msg.is_last_data, but I think the whole thing - // ought to be rejiggered to wait for straggling data. - retire_stream_key: bool, ) -> Result<(), String>; fn request_route_and_transmit( @@ -1201,9 +1196,9 @@ impl IBCDHelper for IBCDHelperReal { &self, proxy_server: &mut ProxyServer, msg: InboundClientData, - retire_stream_key: bool, ) -> Result<(), String> { let client_addr = msg.client_addr; + let last_data = msg.last_data; if proxy_server.consuming_wallet_balance.is_none() && proxy_server.is_decentralized { let protocol_pack = match from_ibcd(&msg) { Err(e) => return Err(e), @@ -1259,13 +1254,8 @@ impl IBCDHelper for IBCDHelperReal { stream_info.protocol_opt = Some(payload.protocol); } } - let args = TransmitToHopperArgs::new( - proxy_server, - payload, - client_addr, - timestamp, - retire_stream_key, - ); + let args = + TransmitToHopperArgs::new(proxy_server, payload, client_addr, timestamp, last_data); let pld = &args.payload; let stream_info = proxy_server .stream_info(&pld.stream_key) @@ -1696,16 +1686,9 @@ mod tests { } } - fn return_route_with_id(cryptde: &dyn CryptDE, return_route_id: u32) -> Route { - let cover_hop = make_cover_hop(cryptde); - let id_hop = cryptde - .encode( - &cryptde.public_key(), - &PlainData::from(serde_cbor::ser::to_vec(&return_route_id).unwrap()), - ) - .unwrap(); + fn return_route(cryptde: &dyn CryptDE) -> Route { Route { - hops: vec![cover_hop, id_hop], + hops: vec![make_cover_hop(cryptde)], } } @@ -1724,7 +1707,7 @@ mod tests { #[derive(Default)] struct IBCDHelperMock { - handle_normal_client_data_params: Arc>>, + handle_normal_client_data_params: Arc>>, handle_normal_client_data_results: RefCell>>, } @@ -1733,12 +1716,11 @@ mod tests { &self, _proxy_s: &mut ProxyServer, msg: InboundClientData, - retire_stream_key: bool, ) -> Result<(), String> { self.handle_normal_client_data_params .lock() .unwrap() - .push((msg, retire_stream_key)); + .push(msg); self.handle_normal_client_data_results .borrow_mut() .remove(0) @@ -1757,7 +1739,7 @@ mod tests { impl IBCDHelperMock { fn handle_normal_client_data_params( mut self, - params: &Arc>>, + params: &Arc>>, ) -> Self { self.handle_normal_client_data_params = params.clone(); self @@ -1825,6 +1807,30 @@ mod tests { assert_eq!(result, back_services); } + #[test] + #[should_panic( + expected = "Internal error: Request was sent over stream Y29uc3RhbnQgZm9yIHBhbmljIG0 without an associated route being stored in stream_info: can't pay" + )] + fn get_expected_services_panics_if_stream_info_exists_but_has_no_route() { + let mut subject = ProxyServer::new( + CRYPTDE_PAIR.clone(), + true, + Some(STANDARD_CONSUMING_WALLET_BALANCE), + false, + false, + ); + let stream_key = StreamKey::from_bytes(b"constant for panic message"); + subject.stream_info.insert( + stream_key.clone(), + StreamInfoBuilder::new() + // no route_opt: problem + .protocol(ProxyProtocol::TLS) + .build(), + ); + + let _ = subject.get_expected_return_services(&stream_key).unwrap(); + } + #[test] fn proxy_server_receives_http_request_with_new_stream_key_from_dispatcher_then_sends_cores_package_to_hopper( ) { @@ -2087,9 +2093,6 @@ mod tests { subject .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - subject - .stream_info - .insert(stream_key.clone(), StreamInfoBuilder::new().build()); subject.stream_info.insert( stream_key.clone(), StreamInfoBuilder::new() @@ -2129,7 +2132,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 1234), + return_route(cryptde), client_response_payload.into(), 0, ); @@ -2953,9 +2956,8 @@ mod tests { let http_request = b"GET /index.html HTTP/1.1\r\nHost: nowhere.com\r\n\r\n"; let destination_key = PublicKey::from(&b"our destination"[..]); let route = Route { hops: vec![] }; - let route_with_rrid = route.clone(); let route_query_response = RouteQueryResponse { - route, + route: route.clone(), expected_services: ExpectedServices::RoundTrip( vec![make_exit_service_from_key(destination_key.clone())], vec![], @@ -2989,7 +2991,7 @@ mod tests { }; let expected_pkg = IncipientCoresPackage::new( main_cryptde, - route_with_rrid, + route, expected_payload.into(), &destination_key, ) @@ -3365,7 +3367,6 @@ mod tests { System::current().stop(); system.run(); - TestLogHandler::new().exists_log_containing(&format!("ERROR: ProxyServer: No dns_failure_retry found for stream key {stream_key} while handling AddRouteResultMessage")); } #[test] @@ -4039,7 +4040,7 @@ mod tests { .build(), ); let subject_addr: Addr = subject.start(); - let remaining_route = return_route_with_id(cryptde, 0); + let remaining_route = return_route(cryptde); let client_response_payload = ClientResponsePayload_0v1 { stream_key: stream_key.clone(), sequenced_packet: SequencedPacket { @@ -4137,7 +4138,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0), + return_route(cryptde), client_response_payload.into(), 0, ); @@ -4347,7 +4348,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0), + return_route(cryptde), client_response_payload.into(), 5432, ); @@ -4449,7 +4450,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0), + return_route(cryptde), first_client_response_payload.into(), 0, ); @@ -4467,7 +4468,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.5:1235").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 1235), + return_route(cryptde), second_client_response_payload.into(), 0, ); @@ -4564,7 +4565,6 @@ mod tests { let test_name = "dns_retry_entry_is_removed_after_a_successful_client_response"; let system = System::new(test_name); let cryptde = CRYPTDE_PAIR.main.as_ref(); - let logger = Logger::new(test_name); let mut subject = ProxyServer::new( CRYPTDE_PAIR.clone(), true, @@ -4636,7 +4636,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0), + return_route(cryptde), first_client_response_payload.into(), 0, ); @@ -4720,7 +4720,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0), + return_route(cryptde), client_response_payload.into(), 0, ); @@ -4816,7 +4816,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0), + return_route(cryptde), dns_resolve_failure.into(), 0, ); @@ -4908,7 +4908,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0), + return_route(cryptde), dns_resolve_failure_payload.into(), 0, ); @@ -5008,7 +5008,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0), + return_route(cryptde), dns_resolve_failure.into(), 0, ); @@ -5052,7 +5052,6 @@ mod tests { false, ); let stream_key = StreamKey::make_meaningless_stream_key(); - let return_route_id = 0; let socket_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); let client_payload = make_request_payload(111, cryptde); let exit_public_key = PublicKey::from(&b"exit_key"[..]); @@ -5085,7 +5084,7 @@ mod tests { ExpiredCoresPackage::new( socket_addr, Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, return_route_id), + return_route(cryptde), dns_resolve_failure.into(), 0, ); @@ -5125,7 +5124,6 @@ mod tests { false, ); let stream_key = StreamKey::make_meaningless_stream_key(); - let return_route_id = 0; let socket_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); let client_payload = make_request_payload(111, cryptde); subject @@ -5161,7 +5159,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, return_route_id), + return_route(cryptde), dns_resolve_failure.into(), 0, ); @@ -5237,7 +5235,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0), + return_route(cryptde), dns_resolve_failure.into(), 0, ); @@ -5292,7 +5290,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0), + return_route(cryptde), dns_resolve_failure.into(), 0, ); @@ -5335,7 +5333,6 @@ mod tests { fn handle_dns_resolve_failure_sent_request_retry() { let test_name = "handle_dns_resolve_failure_sent_request_retry"; let system = System::new(test_name); - let logger = Logger::new(test_name); let resolve_message_params_arc = Arc::new(Mutex::new(vec![])); let (neighborhood_mock, _, _) = make_recorder(); let exit_public_key = PublicKey::from(&b"exit_key"[..]); @@ -5401,7 +5398,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0), + return_route(cryptde), dns_resolve_failure.into(), 0, ); @@ -5491,7 +5488,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0), + return_route(cryptde), dns_resolve_failure.into(), 0, ); @@ -5586,7 +5583,7 @@ mod tests { ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0), + return_route(cryptde), dns_resolve_failure.into(), 0, ); @@ -5639,7 +5636,7 @@ mod tests { subject .keys_and_addrs .insert(stream_key.clone(), socket_addr.clone()); - let remaining_route = return_route_with_id(cryptde, 4321); + let remaining_route = return_route(cryptde); subject.stream_info.insert( stream_key.clone(), StreamInfoBuilder::new() @@ -5744,7 +5741,7 @@ mod tests { let expired_cores_package = ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), Some(make_wallet("irrelevant")), - return_route_with_id(cryptde, 0 /* dummy */), + return_route(cryptde), client_response_payload, 0, ); @@ -5761,7 +5758,6 @@ mod tests { #[test] fn handle_stream_shutdown_msg_handles_unknown_peer_addr() { - let test_name = "handle_stream_shutdown_msg_handles_unknown_peer_addr"; let mut subject = ProxyServer::new(CRYPTDE_PAIR.clone(), true, None, false, false); let unaffected_socket_addr = SocketAddr::from_str("2.3.4.5:6789").unwrap(); let unaffected_stream_key = StreamKey::make_meaningful_stream_key("unaffected"); @@ -6129,14 +6125,13 @@ mod tests { let after = SystemTime::now(); let handle_normal_client_data = help_to_handle_normal_client_data_params_arc.lock().unwrap(); - let (inbound_client_data_msg, retire_stream_key) = &handle_normal_client_data[0]; + let inbound_client_data_msg = &handle_normal_client_data[0]; assert_eq!(inbound_client_data_msg.client_addr, socket_addr); assert_eq!(inbound_client_data_msg.data, Vec::::new()); assert_eq!(inbound_client_data_msg.last_data, true); assert_eq!(inbound_client_data_msg.is_clandestine, false); let actual_timestamp = inbound_client_data_msg.timestamp; assert!(before <= actual_timestamp && actual_timestamp <= after); - assert_eq!(*retire_stream_key, true) } #[test] @@ -6153,11 +6148,8 @@ mod tests { data: vec![], }; - let result = IBCDHelperReal::new().handle_normal_client_data( - &mut proxy_server, - inbound_client_data_msg, - true, - ); + let result = IBCDHelperReal::new() + .handle_normal_client_data(&mut proxy_server, inbound_client_data_msg); assert_eq!( result, @@ -6291,11 +6283,8 @@ mod tests { data: vec![], }; - let result = IBCDHelperReal::new().handle_normal_client_data( - &mut proxy_server, - inbound_client_data_msg, - true, - ); + let result = IBCDHelperReal::new() + .handle_normal_client_data(&mut proxy_server, inbound_client_data_msg); assert_eq!( result, @@ -6339,9 +6328,7 @@ mod tests { ) .unwrap(); let stream_key_factory = StreamKeyFactoryMock::new().make_result(stream_key.clone()); - let system = System::new( - "proxy_server_receives_http_request_from_dispatcher_then_sends_cores_package_to_hopper", - ); + let system = System::new(test_name); let mut subject = ProxyServer::new( CRYPTDE_PAIR.clone(), true, @@ -6413,9 +6400,7 @@ mod tests { ) .unwrap(); let stream_key_factory = StreamKeyFactoryMock::new().make_result(stream_key.clone()); - let system = System::new( - "new_http_request_creates_new_exhausted_entry_inside_dns_retries_hashmap_zero_hop", - ); + let system = System::new(test_name); let mut subject = ProxyServer::new( CRYPTDE_PAIR.clone(), false, @@ -6537,11 +6522,8 @@ mod tests { data: expected_data, }; - let result = IBCDHelperReal::new().handle_normal_client_data( - &mut proxy_server, - inbound_client_data_msg, - true, - ); + let result = IBCDHelperReal::new() + .handle_normal_client_data(&mut proxy_server, inbound_client_data_msg); assert_eq!( result, diff --git a/node/src/sub_lib/stream_key.rs b/node/src/sub_lib/stream_key.rs index 0e6132a7f..8e6629ab5 100644 --- a/node/src/sub_lib/stream_key.rs +++ b/node/src/sub_lib/stream_key.rs @@ -93,6 +93,15 @@ impl StreamKey { hash: hash.digest().bytes(), } } + + #[cfg(test)] + pub fn from_bytes(bytes: &[u8]) -> StreamKey { + let mut hash = [0xA; sha1::DIGEST_LENGTH]; + for i in 0..std::cmp::min(sha1::DIGEST_LENGTH, bytes.len()) { + hash[i] = bytes[i]; + } + StreamKey { hash } + } } impl StreamKey { diff --git a/node/src/sub_lib/ttl_hashmap.rs b/node/src/sub_lib/ttl_hashmap.rs index 1c32dea0d..4782e78a3 100644 --- a/node/src/sub_lib/ttl_hashmap.rs +++ b/node/src/sub_lib/ttl_hashmap.rs @@ -94,8 +94,8 @@ where .collect() }; + let mut data = self.data.borrow_mut(); expired.iter().for_each(|key| { - let mut data = self.data.borrow_mut(); match data.remove(key) { Some((value, _)) => { if !(self.retire_closure)(key, value.as_ref()) { From 91193af5b3f128df0954d9d831a8b8d81215a40c Mon Sep 17 00:00:00 2001 From: Dan Wiebe Date: Mon, 20 Oct 2025 22:36:24 -0400 Subject: [PATCH 5/5] Formatting --- node/src/neighborhood/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/neighborhood/mod.rs b/node/src/neighborhood/mod.rs index c512384f2..2345342d8 100644 --- a/node/src/neighborhood/mod.rs +++ b/node/src/neighborhood/mod.rs @@ -2214,6 +2214,7 @@ mod tests { use crate::sub_lib::hop::LiveHop; use crate::sub_lib::hopper::MessageType; use crate::sub_lib::host::Host; + use crate::sub_lib::neighborhood::ExpectedServices::OneWay; use crate::sub_lib::neighborhood::{ AskAboutDebutGossipMessage, ConfigChange, ConfigChangeMsg, NeighborhoodMode, WalletPair, }; @@ -2246,7 +2247,6 @@ mod tests { }; use crate::test_utils::vec_to_set; use masq_lib::test_utils::logging::{init_test_logging, TestLogHandler}; - use crate::sub_lib::neighborhood::ExpectedServices::OneWay; lazy_static! { static ref CRYPTDE_PAIR: CryptDEPair = CryptDEPair::null();