@@ -54,6 +54,38 @@ impl Recycler {
54
54
eof : false ,
55
55
}
56
56
}
57
+
58
+ fn conn_return ( & mut self , conn : Conn , pool_is_closed : bool ) {
59
+ let mut exchange = self . inner . exchange . lock ( ) . unwrap ( ) ;
60
+ if pool_is_closed || exchange. available . len ( ) >= self . pool_opts . active_bound ( ) {
61
+ drop ( exchange) ;
62
+ self . inner
63
+ . metrics
64
+ . discarded_superfluous_connection
65
+ . fetch_add ( 1 , Ordering :: Relaxed ) ;
66
+ self . discard . push ( conn. close_conn ( ) . boxed ( ) ) ;
67
+ } else {
68
+ self . inner
69
+ . metrics
70
+ . connection_returned_to_pool
71
+ . fetch_add ( 1 , Ordering :: Relaxed ) ;
72
+ #[ cfg( feature = "hdrhistogram" ) ]
73
+ self . inner
74
+ . metrics
75
+ . connection_active_duration
76
+ . lock ( )
77
+ . unwrap ( )
78
+ . saturating_record ( conn. inner . active_since . elapsed ( ) . as_micros ( ) as u64 ) ;
79
+ exchange. available . push_back ( conn. into ( ) ) ;
80
+ self . inner
81
+ . metrics
82
+ . connections_in_pool
83
+ . store ( exchange. available . len ( ) , Ordering :: Relaxed ) ;
84
+ if let Some ( w) = exchange. waiting . pop ( ) {
85
+ w. wake ( ) ;
86
+ }
87
+ }
88
+ }
57
89
}
58
90
59
91
impl Future for Recycler {
@@ -62,44 +94,6 @@ impl Future for Recycler {
62
94
fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
63
95
let mut close = self . inner . close . load ( Ordering :: Acquire ) ;
64
96
65
- macro_rules! conn_return {
66
- ( $self: ident, $conn: ident, $pool_is_closed: expr) => { {
67
- let mut exchange = $self. inner. exchange. lock( ) . unwrap( ) ;
68
- if $pool_is_closed || exchange. available. len( ) >= $self. pool_opts. active_bound( ) {
69
- drop( exchange) ;
70
- $self
71
- . inner
72
- . metrics
73
- . discarded_superfluous_connection
74
- . fetch_add( 1 , Ordering :: Relaxed ) ;
75
- $self. discard. push( $conn. close_conn( ) . boxed( ) ) ;
76
- } else {
77
- $self
78
- . inner
79
- . metrics
80
- . connection_returned_to_pool
81
- . fetch_add( 1 , Ordering :: Relaxed ) ;
82
- #[ cfg( feature = "hdrhistogram" ) ]
83
- $self
84
- . inner
85
- . metrics
86
- . connection_active_duration
87
- . lock( )
88
- . unwrap( )
89
- . saturating_record( $conn. inner. active_since. elapsed( ) . as_micros( ) as u64 ) ;
90
- exchange. available. push_back( $conn. into( ) ) ;
91
- $self
92
- . inner
93
- . metrics
94
- . connections_in_pool
95
- . store( exchange. available. len( ) , Ordering :: Relaxed ) ;
96
- if let Some ( w) = exchange. waiting. pop( ) {
97
- w. wake( ) ;
98
- }
99
- }
100
- } } ;
101
- }
102
-
103
97
macro_rules! conn_decision {
104
98
( $self: ident, $conn: ident) => {
105
99
if $conn. inner. stream. is_none( ) || $conn. inner. disconnected {
@@ -132,7 +126,7 @@ impl Future for Recycler {
132
126
. fetch_add( 1 , Ordering :: Relaxed ) ;
133
127
$self. reset. push( $conn. reset_for_pool( ) . boxed( ) ) ;
134
128
} else {
135
- conn_return! ( $self, $conn, false ) ;
129
+ $self. conn_return ( $conn, false ) ;
136
130
}
137
131
} ;
138
132
}
@@ -202,7 +196,7 @@ impl Future for Recycler {
202
196
loop {
203
197
match Pin :: new ( & mut self . reset ) . poll_next ( cx) {
204
198
Poll :: Pending | Poll :: Ready ( None ) => break ,
205
- Poll :: Ready ( Some ( Ok ( conn) ) ) => conn_return ! ( self , conn, close) ,
199
+ Poll :: Ready ( Some ( Ok ( conn) ) ) => self . conn_return ( conn, close) ,
206
200
Poll :: Ready ( Some ( Err ( e) ) ) => {
207
201
// an error during reset.
208
202
// replace with a new connection
0 commit comments