6
6
// option. All files in the project carrying such notice may not be copied,
7
7
// modified, or distributed except according to those terms.
8
8
9
- use std:: {
10
- fmt,
11
- future:: Future ,
12
- pin:: Pin ,
13
- task:: { Context , Poll } ,
14
- } ;
15
-
16
- use futures_core:: ready;
17
- #[ cfg( feature = "tracing" ) ]
18
- use {
19
- std:: sync:: Arc ,
20
- tracing:: { debug_span, Span } ,
21
- } ;
9
+ use std:: { fmt, future:: poll_fn, task:: Context } ;
22
10
23
11
use crate :: {
24
12
conn:: {
@@ -55,35 +43,14 @@ impl fmt::Debug for GetConnInner {
55
43
}
56
44
}
57
45
58
- /// This future will take connection from a pool and resolve to [`Conn`].
59
46
#[ derive( Debug ) ]
60
- #[ must_use = "futures do nothing unless you `.await` or poll them" ]
61
- struct GetConn {
47
+ struct GetConnState {
62
48
queue_id : QueueId ,
63
49
pool : Option < Pool > ,
64
50
inner : GetConnInner ,
65
- reset_upon_returning_to_a_pool : bool ,
66
- #[ cfg( feature = "tracing" ) ]
67
- span : Arc < Span > ,
68
- }
69
-
70
- pub ( crate ) async fn get_conn ( pool : Pool ) -> Result < Conn > {
71
- let reset_connection = pool. opts . pool_opts ( ) . reset_connection ( ) ;
72
- GetConn :: new ( pool, reset_connection) . await
73
51
}
74
52
75
- impl GetConn {
76
- fn new ( pool : Pool , reset_upon_returning_to_a_pool : bool ) -> GetConn {
77
- GetConn {
78
- queue_id : QueueId :: next ( ) ,
79
- pool : Some ( pool) ,
80
- inner : GetConnInner :: New ,
81
- reset_upon_returning_to_a_pool,
82
- #[ cfg( feature = "tracing" ) ]
83
- span : Arc :: new ( debug_span ! ( "mysql_async::get_conn" ) ) ,
84
- }
85
- }
86
-
53
+ impl GetConnState {
87
54
fn pool_mut ( & mut self ) -> & mut Pool {
88
55
self . pool
89
56
. as_mut ( )
@@ -97,86 +64,84 @@ impl GetConn {
97
64
}
98
65
}
99
66
100
- // this manual implementation of Future may seem stupid, but we sort
101
- // of need it to get the dropping behavior we want.
102
- impl Future for GetConn {
103
- type Output = Result < Conn > ;
104
-
105
- fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
106
- #[ cfg( feature = "tracing" ) ]
107
- let span = self . span . clone ( ) ;
108
- #[ cfg( feature = "tracing" ) ]
109
- let _span_guard = span. enter ( ) ;
110
- loop {
111
- match self . inner {
112
- GetConnInner :: New => {
113
- let queue_id = self . queue_id ;
114
- let next = ready ! ( self . pool_mut( ) . poll_new_conn( cx, queue_id) ) ?;
115
- match next {
116
- GetConnInner :: Connecting ( conn_fut) => {
117
- self . inner = GetConnInner :: Connecting ( conn_fut) ;
118
- }
119
- GetConnInner :: Checking ( conn_fut) => {
120
- self . inner = GetConnInner :: Checking ( conn_fut) ;
121
- }
122
- GetConnInner :: Done => unreachable ! (
123
- "Pool::poll_new_conn never gives out already-consumed GetConns"
124
- ) ,
125
- GetConnInner :: New => {
126
- unreachable ! ( "Pool::poll_new_conn never gives out GetConnInner::New" )
127
- }
67
+ /// This future will take connection from a pool and resolve to [`Conn`].
68
+ #[ cfg_attr( feature = "tracing" , tracing:: instrument( level = "debug" , skip_all) ) ]
69
+ pub ( crate ) async fn get_conn ( pool : Pool ) -> Result < Conn > {
70
+ let reset_upon_returning_to_a_pool = pool. opts . pool_opts ( ) . reset_connection ( ) ;
71
+ let queue_id = QueueId :: next ( ) ;
72
+ let mut state = GetConnState {
73
+ queue_id,
74
+ pool : Some ( pool) ,
75
+ inner : GetConnInner :: New ,
76
+ } ;
77
+
78
+ loop {
79
+ match state. inner {
80
+ GetConnInner :: New => {
81
+ let pool = state. pool_mut ( ) ;
82
+ let poll_new = |cx : & mut Context < ' _ > | pool. poll_new_conn ( cx, queue_id) ;
83
+ let next = poll_fn ( poll_new) . await ?;
84
+ match next {
85
+ GetConnInner :: Connecting ( conn_fut) => {
86
+ state. inner = GetConnInner :: Connecting ( conn_fut) ;
87
+ }
88
+ GetConnInner :: Checking ( conn_fut) => {
89
+ state. inner = GetConnInner :: Checking ( conn_fut) ;
90
+ }
91
+ GetConnInner :: Done => unreachable ! (
92
+ "Pool::poll_new_conn never gives out already-consumed GetConns"
93
+ ) ,
94
+ GetConnInner :: New => {
95
+ unreachable ! ( "Pool::poll_new_conn never gives out GetConnInner::New" )
128
96
}
129
97
}
130
- GetConnInner :: Done => {
131
- unreachable ! ( "GetConn::poll polled after returning Async::Ready" ) ;
132
- }
133
- GetConnInner :: Connecting ( ref mut f) => {
134
- let result = ready ! ( Pin :: new( f) . poll( cx) ) ;
135
- let pool = self . pool_take ( ) ;
136
-
137
- self . inner = GetConnInner :: Done ;
138
-
139
- return match result {
140
- Ok ( mut c) => {
141
- c. inner . pool = Some ( pool) ;
142
- c. inner . reset_upon_returning_to_a_pool =
143
- self . reset_upon_returning_to_a_pool ;
144
- Poll :: Ready ( Ok ( c) )
145
- }
146
- Err ( e) => {
147
- pool. cancel_connection ( ) ;
148
- Poll :: Ready ( Err ( e) )
149
- }
150
- } ;
151
- }
152
- GetConnInner :: Checking ( ref mut f) => {
153
- let result = ready ! ( Pin :: new( f) . poll( cx) ) ;
154
- match result {
155
- Ok ( mut c) => {
156
- self . inner = GetConnInner :: Done ;
157
-
158
- let pool = self . pool_take ( ) ;
159
- c. inner . pool = Some ( pool) ;
160
- c. inner . reset_upon_returning_to_a_pool =
161
- self . reset_upon_returning_to_a_pool ;
162
- return Poll :: Ready ( Ok ( c) ) ;
163
- }
164
- Err ( _) => {
165
- // Idling connection is broken. We'll drop it and try again.
166
- self . inner = GetConnInner :: New ;
98
+ }
99
+ GetConnInner :: Done => {
100
+ unreachable ! ( "GetConn::poll polled after returning Async::Ready" ) ;
101
+ }
102
+ GetConnInner :: Connecting ( ref mut f) => {
103
+ let result = f. await ;
104
+ let pool = state. pool_take ( ) ;
105
+ state. inner = GetConnInner :: Done ;
106
+
107
+ return match result {
108
+ Ok ( mut c) => {
109
+ c. inner . pool = Some ( pool) ;
110
+ c. inner . reset_upon_returning_to_a_pool = reset_upon_returning_to_a_pool;
111
+ Ok ( c)
112
+ }
113
+ Err ( e) => {
114
+ pool. cancel_connection ( ) ;
115
+ Err ( e)
116
+ }
117
+ } ;
118
+ }
119
+ GetConnInner :: Checking ( ref mut f) => {
120
+ let result = f. await ;
121
+ match result {
122
+ Ok ( mut c) => {
123
+ state. inner = GetConnInner :: Done ;
124
+
125
+ let pool = state. pool_take ( ) ;
126
+ c. inner . pool = Some ( pool) ;
127
+ c. inner . reset_upon_returning_to_a_pool = reset_upon_returning_to_a_pool;
128
+ return Ok ( c) ;
129
+ }
130
+ Err ( _) => {
131
+ // Idling connection is broken. We'll drop it and try again.
132
+ state. inner = GetConnInner :: New ;
167
133
168
- let pool = self . pool_mut ( ) ;
169
- pool. cancel_connection ( ) ;
170
- continue ;
171
- }
134
+ let pool = state. pool_mut ( ) ;
135
+ pool. cancel_connection ( ) ;
136
+ continue ;
172
137
}
173
138
}
174
139
}
175
140
}
176
141
}
177
142
}
178
143
179
- impl Drop for GetConn {
144
+ impl Drop for GetConnState {
180
145
fn drop ( & mut self ) {
181
146
// We drop a connection before it can be resolved, a.k.a. cancelling it.
182
147
// Make sure we maintain the necessary invariants towards the pool.
0 commit comments