@@ -6,9 +6,8 @@ use crate::node::client_handling::websocket::connection_handler::FreshHandler;
66use nym_task:: TaskClient ;
77use rand:: rngs:: OsRng ;
88use std:: net:: SocketAddr ;
9- use std:: process;
10- use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
11- use std:: sync:: Arc ;
9+ use std:: { io, process} ;
10+ use tokio:: net:: TcpStream ;
1211use tokio:: task:: JoinHandle ;
1312use tracing:: * ;
1413
@@ -34,6 +33,76 @@ impl Listener {
3433 }
3534 }
3635
36+ fn active_connections ( & self ) -> usize {
37+ self . shared_state
38+ . metrics
39+ . network
40+ . active_ingress_websocket_connections_count ( )
41+ }
42+
43+ fn prepare_connection_handler (
44+ & self ,
45+ socket : TcpStream ,
46+ remote_address : SocketAddr ,
47+ ) -> FreshHandler < OsRng , TcpStream > {
48+ let shutdown = self
49+ . shutdown
50+ . fork ( format ! ( "websocket_handler_{remote_address}" ) ) ;
51+ FreshHandler :: new (
52+ OsRng ,
53+ socket,
54+ self . shared_state . clone ( ) ,
55+ remote_address,
56+ shutdown,
57+ )
58+ }
59+
60+ fn try_handle_accepted_connection ( & self , accepted : io:: Result < ( TcpStream , SocketAddr ) > ) {
61+ match accepted {
62+ Ok ( ( socket, remote_address) ) => {
63+ trace ! ( "received a socket connection from {remote_address}" ) ;
64+
65+ let active = self . active_connections ( ) ;
66+
67+ // 1. check if we're within the connection limit
68+ if active >= self . maximum_open_connections {
69+ warn ! (
70+ "connection limit exceeded ({}). can't accept request from {remote_address}" ,
71+ self . maximum_open_connections
72+ ) ;
73+ return ;
74+ }
75+
76+ debug ! ( "there are currently {active} connected clients on the gateway websocket" ) ;
77+
78+ // 2. prepare shared data for the new connection handler
79+ let handle = self . prepare_connection_handler ( socket, remote_address) ;
80+
81+ // 3. increment the connection counter.
82+ // make sure to do it before spawning the task,
83+ // as another connection might get accepted before the task is scheduled
84+ // for execution
85+ self . shared_state
86+ . metrics
87+ . network
88+ . new_ingress_websocket_client ( ) ;
89+
90+ // 4. spawn the task handling the client connection
91+ tokio:: spawn ( async move {
92+ // TODO: refactor it similarly to the mixnet listener on the nym-node
93+ let metrics_ref = handle. shared_state . metrics . clone ( ) ;
94+
95+ // 4.1. handle all client requests until connection gets terminated
96+ handle. start_handling ( ) . await ;
97+
98+ // 4.2. decrement the connection counter
99+ metrics_ref. network . disconnected_ingress_websocket_client ( ) ;
100+ } ) ;
101+ }
102+ Err ( err) => warn ! ( "failed to accept client connection: {err}" ) ,
103+ }
104+ }
105+
37106 // TODO: change the signature to pub(crate) async fn run(&self, handler: Handler)
38107
39108 pub ( crate ) async fn run ( & mut self ) {
@@ -46,47 +115,14 @@ impl Listener {
46115 }
47116 } ;
48117
49- let open_connections = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
50-
51118 while !self . shutdown . is_shutdown ( ) {
52119 tokio:: select! {
53120 biased;
54121 _ = self . shutdown. recv( ) => {
55122 trace!( "client_handling::Listener: received shutdown" ) ;
56123 }
57124 connection = tcp_listener. accept( ) => {
58- match connection {
59- Ok ( ( socket, remote_addr) ) => {
60- let shutdown = self . shutdown. fork( format!( "websocket_handler_{remote_addr}" ) ) ;
61- trace!( "received a socket connection from {remote_addr}" ) ;
62-
63- if open_connections. fetch_add( 1 , Ordering :: SeqCst ) >= self . maximum_open_connections {
64- warn!( "connection limit exceeded ({}). can't accept request from {remote_addr}" , self . maximum_open_connections) ;
65- continue ;
66- }
67-
68- // TODO: I think we *REALLY* need a mechanism for having a maximum number of connected
69- // clients or spawned tokio tasks -> perhaps a worker system?
70- let handle = FreshHandler :: new(
71- OsRng ,
72- socket,
73- self . shared_state. clone( ) ,
74- remote_addr,
75- shutdown,
76- ) ;
77- let open_connections = open_connections. clone( ) ;
78- tokio:: spawn( async move {
79- // TODO: refactor it similarly to the mixnet listener on the nym-node
80- let metrics_ref = handle. shared_state. metrics. clone( ) ;
81- metrics_ref. network. new_ingress_websocket_client( ) ;
82- open_connections. fetch_add( 1 , Ordering :: SeqCst ) ;
83- handle. start_handling( ) . await ;
84- metrics_ref. network. disconnected_ingress_websocket_client( ) ;
85- open_connections. fetch_sub( 1 , Ordering :: SeqCst ) ;
86- } ) ;
87- }
88- Err ( err) => warn!( "failed to get client: {err}" ) ,
89- }
125+ self . try_handle_accepted_connection( connection)
90126 }
91127
92128 }
0 commit comments