13
13
// limitations under the License.
14
14
15
15
use std:: collections:: HashMap ;
16
+ use std:: sync:: Arc ;
16
17
use std:: time:: Duration ;
17
18
use std:: time:: Instant ;
18
19
20
+ use reqwest:: cookie:: CookieStore ;
19
21
use reqwest:: header:: HeaderMap ;
20
22
use reqwest:: header:: HeaderValue ;
21
23
use reqwest:: Client ;
22
24
use reqwest:: ClientBuilder ;
23
- use reqwest:: Response ;
24
25
use serde:: Deserialize ;
25
26
use sqllogictest:: DBOutput ;
26
27
use sqllogictest:: DefaultColumnType ;
28
+ use url:: Url ;
27
29
28
- use crate :: error :: DSqlLogicTestError :: Databend ;
30
+ use crate :: client :: global_cookie_store :: GlobalCookieStore ;
29
31
use crate :: error:: Result ;
30
32
use crate :: util:: parser_rows;
31
33
use crate :: util:: HttpSessionConf ;
32
34
33
- const SESSION_HEADER : & str = "X-DATABEND-SESSION" ;
34
-
35
35
pub struct HttpClient {
36
36
pub client : Client ,
37
37
pub session_token : String ,
38
- pub session_headers : HeaderMap ,
39
38
pub debug : bool ,
40
39
pub session : Option < HttpSessionConf > ,
41
40
pub port : u16 ,
@@ -86,59 +85,28 @@ impl HttpClient {
86
85
header. insert ( "Accept" , HeaderValue :: from_str ( "application/json" ) . unwrap ( ) ) ;
87
86
header. insert (
88
87
"X-DATABEND-CLIENT-CAPS" ,
89
- HeaderValue :: from_str ( "session_header " ) . unwrap ( ) ,
88
+ HeaderValue :: from_str ( "session_cookie " ) . unwrap ( ) ,
90
89
) ;
90
+ let cookie_provider = GlobalCookieStore :: new ( ) ;
91
91
let client = ClientBuilder :: new ( )
92
+ . cookie_provider ( Arc :: new ( cookie_provider) )
92
93
. default_headers ( header)
93
94
// https://github.com/hyperium/hyper/issues/2136#issuecomment-589488526
94
95
. http2_keep_alive_timeout ( Duration :: from_secs ( 15 ) )
95
96
. pool_max_idle_per_host ( 0 )
96
97
. build ( ) ?;
97
- let mut session_headers = HeaderMap :: new ( ) ;
98
- session_headers. insert ( SESSION_HEADER , HeaderValue :: from_str ( "" ) . unwrap ( ) ) ;
99
- let mut res = Self {
100
- client,
101
- session_token : "" . to_string ( ) ,
102
- session_headers,
103
- session : None ,
104
- debug : false ,
105
- port,
106
- } ;
107
- res. login ( ) . await ?;
108
- Ok ( res)
109
- }
110
98
111
- async fn update_session_header ( & mut self , response : Response ) -> Result < Response > {
112
- if let Some ( value) = response. headers ( ) . get ( SESSION_HEADER ) {
113
- let session_header = value. to_str ( ) . unwrap ( ) . to_owned ( ) ;
114
- if !session_header. is_empty ( ) {
115
- self . session_headers
116
- . insert ( SESSION_HEADER , value. to_owned ( ) ) ;
117
- return Ok ( response) ;
118
- }
119
- }
120
- let meta = format ! ( "response={response:?}" ) ;
121
- let data = response. text ( ) . await . unwrap ( ) ;
122
- Err ( Databend (
123
- format ! ( "{} is empty, {meta}, {data}" , SESSION_HEADER , ) . into ( ) ,
124
- ) )
125
- }
99
+ let url = format ! ( "http://127.0.0.1:{}/v1/session/login" , port) ;
126
100
127
- async fn login ( & mut self ) -> Result < ( ) > {
128
- let url = format ! ( "http://127.0.0.1:{}/v1/session/login" , self . port) ;
129
- let response = self
130
- . client
101
+ let session_token = client
131
102
. post ( & url)
132
- . headers ( self . session_headers . clone ( ) )
133
103
. body ( "{}" )
134
104
. basic_auth ( "root" , Some ( "" ) )
135
105
. send ( )
136
106
. await
137
107
. inspect_err ( |e| {
138
108
println ! ( "fail to send to {}: {:?}" , url, e) ;
139
- } ) ?;
140
- let response = self . update_session_header ( response) . await ?;
141
- self . session_token = response
109
+ } ) ?
142
110
. json :: < LoginResponse > ( )
143
111
. await
144
112
. inspect_err ( |e| {
@@ -147,7 +115,14 @@ impl HttpClient {
147
115
. tokens
148
116
. unwrap ( )
149
117
. session_token ;
150
- Ok ( ( ) )
118
+
119
+ Ok ( Self {
120
+ client,
121
+ session_token,
122
+ session : None ,
123
+ debug : false ,
124
+ port,
125
+ } )
151
126
}
152
127
153
128
pub async fn query ( & mut self , sql : & str ) -> Result < DBOutput < DefaultColumnType > > {
@@ -204,43 +179,43 @@ impl HttpClient {
204
179
}
205
180
206
181
// Send request and get response by json format
207
- async fn post_query ( & mut self , sql : & str , url : & str ) -> Result < QueryResponse > {
182
+ async fn post_query ( & self , sql : & str , url : & str ) -> Result < QueryResponse > {
208
183
let mut query = HashMap :: new ( ) ;
209
184
query. insert ( "sql" , serde_json:: to_value ( sql) ?) ;
210
185
if let Some ( session) = & self . session {
211
186
query. insert ( "session" , serde_json:: to_value ( session) ?) ;
212
187
}
213
- let response = self
188
+ Ok ( self
214
189
. client
215
190
. post ( url)
216
- . headers ( self . session_headers . clone ( ) )
217
191
. json ( & query)
218
192
. bearer_auth ( & self . session_token )
219
193
. send ( )
220
194
. await
221
195
. inspect_err ( |e| {
222
196
println ! ( "fail to send to {}: {:?}" , url, e) ;
223
- } ) ?;
224
- let response = self . update_session_header ( response) . await ?;
225
- Ok ( response. json :: < QueryResponse > ( ) . await . inspect_err ( |e| {
226
- println ! ( "fail to decode json when call {}: {:?}" , url, e) ;
227
- } ) ?)
197
+ } ) ?
198
+ . json :: < QueryResponse > ( )
199
+ . await
200
+ . inspect_err ( |e| {
201
+ println ! ( "fail to decode json when call {}: {:?}" , url, e) ;
202
+ } ) ?)
228
203
}
229
204
230
- async fn poll_query_result ( & mut self , url : & str ) -> Result < QueryResponse > {
231
- let response = self
205
+ async fn poll_query_result ( & self , url : & str ) -> Result < QueryResponse > {
206
+ Ok ( self
232
207
. client
233
208
. get ( url)
234
209
. bearer_auth ( & self . session_token )
235
- . headers ( self . session_headers . clone ( ) )
236
210
. send ( )
237
211
. await
238
212
. inspect_err ( |e| {
239
213
println ! ( "fail to send to {}: {:?}" , url, e) ;
240
- } ) ?;
241
- let response = self . update_session_header ( response) . await ?;
242
- Ok ( response. json :: < QueryResponse > ( ) . await . inspect_err ( |e| {
243
- println ! ( "fail to decode json when call {}: {:?}" , url, e) ;
244
- } ) ?)
214
+ } ) ?
215
+ . json :: < QueryResponse > ( )
216
+ . await
217
+ . inspect_err ( |e| {
218
+ println ! ( "fail to decode json when call {}: {:?}" , url, e) ;
219
+ } ) ?)
245
220
}
246
221
}
0 commit comments