@@ -281,9 +281,8 @@ impl PegboardGateway {
281281		let  mut  ws_rx = client_ws. accept ( ) . await ?; 
282282
283283		// Spawn task to forward messages from server to client 
284- 		let  mut  msg_rx_for_task = msg_rx; 
285- 		tokio:: spawn ( async  move  { 
286- 			while  let  Some ( msg)  = msg_rx_for_task. recv ( ) . await  { 
284+ 		let  mut  server_to_client = tokio:: spawn ( async  move  { 
285+ 			while  let  Some ( msg)  = msg_rx. recv ( ) . await  { 
287286				match  msg { 
288287					TunnelMessageData :: Message ( 
289288						protocol:: ToServerTunnelMessageKind :: ToServerWebSocketMessage ( ws_msg) , 
@@ -302,6 +301,7 @@ impl PegboardGateway {
302301						protocol:: ToServerTunnelMessageKind :: ToServerWebSocketClose ( close) , 
303302					)  => { 
304303						tracing:: info!( ?close,  "server closed websocket" ) ; 
304+ 						// Exit the task - websocket will be closed when handle_websocket_inner exits 
305305						break ; 
306306					} 
307307					TunnelMessageData :: Timeout  => { 
@@ -313,48 +313,76 @@ impl PegboardGateway {
313313			} 
314314		} ) ; 
315315
316- 		// Forward messages from client to server 
317- 		let  mut  close_reason = None ; 
318- 		while  let  Some ( msg)  = ws_rx. next ( ) . await  { 
319- 			match  msg { 
320- 				Result :: Ok ( Message :: Binary ( data) )  => { 
321- 					let  ws_message = protocol:: ToClientTunnelMessageKind :: ToClientWebSocketMessage ( 
322- 						protocol:: ToClientWebSocketMessage  { 
323- 							data :  data. into ( ) , 
324- 							binary :  true , 
325- 						} , 
326- 					) ; 
327- 					if  let  Err ( err)  = self . shared_state . send_message ( request_id,  ws_message) . await  { 
328- 						if  is_tunnel_service_unavailable ( & err)  { 
329- 							tracing:: warn!( "tunnel closed sending binary message" ) ; 
330- 							close_reason = Some ( "Tunnel closed" . to_string ( ) ) ; 
331- 							break ; 
332- 						}  else  { 
333- 							tracing:: error!( ?err,  "error sending binary message" ) ; 
316+ 		// Spawn task to forward messages from client to server 
317+ 		let  shared_state_clone = self . shared_state . clone ( ) ; 
318+ 		let  mut  client_to_server = tokio:: spawn ( async  move  { 
319+ 			let  mut  close_reason = None ; 
320+ 			while  let  Some ( msg)  = ws_rx. next ( ) . await  { 
321+ 				match  msg { 
322+ 					Result :: Ok ( Message :: Binary ( data) )  => { 
323+ 						let  ws_message =
324+ 							protocol:: ToClientTunnelMessageKind :: ToClientWebSocketMessage ( 
325+ 								protocol:: ToClientWebSocketMessage  { 
326+ 									data :  data. into ( ) , 
327+ 									binary :  true , 
328+ 								} , 
329+ 							) ; 
330+ 						if  let  Err ( err)  = shared_state_clone
331+ 							. send_message ( request_id,  ws_message) 
332+ 							. await 
333+ 						{ 
334+ 							if  is_tunnel_service_unavailable ( & err)  { 
335+ 								tracing:: warn!( "tunnel closed sending binary message" ) ; 
336+ 								close_reason = Some ( "Tunnel closed" . to_string ( ) ) ; 
337+ 								break ; 
338+ 							}  else  { 
339+ 								tracing:: error!( ?err,  "error sending binary message" ) ; 
340+ 							} 
334341						} 
335342					} 
336- 				} 
337- 				Result :: Ok ( Message :: Text ( text) )  => { 
338- 					let  ws_message = protocol:: ToClientTunnelMessageKind :: ToClientWebSocketMessage ( 
339- 						protocol:: ToClientWebSocketMessage  { 
340- 							data :  text. as_bytes ( ) . to_vec ( ) , 
341- 							binary :  false , 
342- 						} , 
343- 					) ; 
344- 					if  let  Err ( err)  = self . shared_state . send_message ( request_id,  ws_message) . await  { 
345- 						if  is_tunnel_service_unavailable ( & err)  { 
346- 							tracing:: warn!( "tunnel closed sending text message" ) ; 
347- 							close_reason = Some ( "Tunnel closed" . to_string ( ) ) ; 
348- 							break ; 
349- 						}  else  { 
350- 							tracing:: error!( ?err,  "error sending text message" ) ; 
343+ 					Result :: Ok ( Message :: Text ( text) )  => { 
344+ 						let  ws_message =
345+ 							protocol:: ToClientTunnelMessageKind :: ToClientWebSocketMessage ( 
346+ 								protocol:: ToClientWebSocketMessage  { 
347+ 									data :  text. as_bytes ( ) . to_vec ( ) , 
348+ 									binary :  false , 
349+ 								} , 
350+ 							) ; 
351+ 						if  let  Err ( err)  = shared_state_clone
352+ 							. send_message ( request_id,  ws_message) 
353+ 							. await 
354+ 						{ 
355+ 							if  is_tunnel_service_unavailable ( & err)  { 
356+ 								tracing:: warn!( "tunnel closed sending text message" ) ; 
357+ 								close_reason = Some ( "Tunnel closed" . to_string ( ) ) ; 
358+ 								break ; 
359+ 							}  else  { 
360+ 								tracing:: error!( ?err,  "error sending text message" ) ; 
361+ 							} 
351362						} 
352363					} 
364+ 					Result :: Ok ( Message :: Close ( _) )  | Err ( _)  => break , 
365+ 					_ => { } 
353366				} 
354- 				Result :: Ok ( Message :: Close ( _) )  | Err ( _)  => break , 
355- 				_ => { } 
356367			} 
357- 		} 
368+ 			close_reason
369+ 		} ) ; 
370+ 
371+ 		// Wait for either task to complete 
372+ 		let  close_reason = tokio:: select! { 
373+ 			_ = & mut  server_to_client => { 
374+ 				tracing:: info!( "server to client task completed" ) ; 
375+ 				None 
376+ 			} 
377+ 			res = & mut  client_to_server => { 
378+ 				tracing:: info!( "client to server task completed" ) ; 
379+ 				res. unwrap_or( None ) 
380+ 			} 
381+ 		} ; 
382+ 
383+ 		// Abort remaining tasks 
384+ 		server_to_client. abort ( ) ; 
385+ 		client_to_server. abort ( ) ; 
358386
359387		// Send WebSocket close message 
360388		let  close_message = protocol:: ToClientTunnelMessageKind :: ToClientWebSocketClose ( 
0 commit comments