13
13
// limitations under the License.
14
14
15
15
use std:: collections:: HashMap ;
16
+ use std:: sync:: Arc ;
16
17
use std:: time:: Duration ;
17
18
use std:: time:: Instant ;
18
19
19
20
use reqwest:: header:: HeaderMap ;
20
21
use reqwest:: header:: HeaderValue ;
21
22
use reqwest:: Client ;
22
23
use reqwest:: ClientBuilder ;
23
- use reqwest:: Response ;
24
24
use serde:: Deserialize ;
25
25
use sqllogictest:: DBOutput ;
26
26
use sqllogictest:: DefaultColumnType ;
27
27
28
- use crate :: error :: DSqlLogicTestError :: Databend ;
28
+ use crate :: client :: global_cookie_store :: GlobalCookieStore ;
29
29
use crate :: error:: Result ;
30
30
use crate :: util:: parser_rows;
31
31
use crate :: util:: HttpSessionConf ;
32
32
33
- const SESSION_HEADER : & str = "X-DATABEND-SESSION" ;
34
-
35
33
pub struct HttpClient {
36
34
pub client : Client ,
37
35
pub session_token : String ,
38
- pub session_headers : HeaderMap ,
39
36
pub debug : bool ,
40
37
pub session : Option < HttpSessionConf > ,
41
38
pub port : u16 ,
@@ -86,59 +83,28 @@ impl HttpClient {
86
83
header. insert ( "Accept" , HeaderValue :: from_str ( "application/json" ) . unwrap ( ) ) ;
87
84
header. insert (
88
85
"X-DATABEND-CLIENT-CAPS" ,
89
- HeaderValue :: from_str ( "session_header " ) . unwrap ( ) ,
86
+ HeaderValue :: from_str ( "session_cookie " ) . unwrap ( ) ,
90
87
) ;
88
+ let cookie_provider = GlobalCookieStore :: new ( ) ;
91
89
let client = ClientBuilder :: new ( )
90
+ . cookie_provider ( Arc :: new ( cookie_provider) )
92
91
. default_headers ( header)
93
92
// https://github.com/hyperium/hyper/issues/2136#issuecomment-589488526
94
93
. http2_keep_alive_timeout ( Duration :: from_secs ( 15 ) )
95
94
. pool_max_idle_per_host ( 0 )
96
95
. build ( ) ?;
97
- let mut session_headers = HeaderMap :: new ( ) ;
98
- session_headers. insert ( SESSION_HEADER , HeaderValue :: from_str ( "" ) . unwrap ( ) ) ;
99
- let mut res = Self {
100
- client,
101
- session_token : "" . to_string ( ) ,
102
- session_headers,
103
- session : None ,
104
- debug : false ,
105
- port,
106
- } ;
107
- res. login ( ) . await ?;
108
- Ok ( res)
109
- }
110
96
111
- async fn update_session_header ( & mut self , response : Response ) -> Result < Response > {
112
- if let Some ( value) = response. headers ( ) . get ( SESSION_HEADER ) {
113
- let session_header = value. to_str ( ) . unwrap ( ) . to_owned ( ) ;
114
- if !session_header. is_empty ( ) {
115
- self . session_headers
116
- . insert ( SESSION_HEADER , value. to_owned ( ) ) ;
117
- return Ok ( response) ;
118
- }
119
- }
120
- let meta = format ! ( "response={response:?}" ) ;
121
- let data = response. text ( ) . await . unwrap ( ) ;
122
- Err ( Databend (
123
- format ! ( "{} is empty, {meta}, {data}" , SESSION_HEADER , ) . into ( ) ,
124
- ) )
125
- }
97
+ let url = format ! ( "http://127.0.0.1:{}/v1/session/login" , port) ;
126
98
127
- async fn login ( & mut self ) -> Result < ( ) > {
128
- let url = format ! ( "http://127.0.0.1:{}/v1/session/login" , self . port) ;
129
- let response = self
130
- . client
99
+ let session_token = client
131
100
. post ( & url)
132
- . headers ( self . session_headers . clone ( ) )
133
101
. body ( "{}" )
134
102
. basic_auth ( "root" , Some ( "" ) )
135
103
. send ( )
136
104
. await
137
105
. inspect_err ( |e| {
138
106
println ! ( "fail to send to {}: {:?}" , url, e) ;
139
- } ) ?;
140
- let response = self . update_session_header ( response) . await ?;
141
- self . session_token = response
107
+ } ) ?
142
108
. json :: < LoginResponse > ( )
143
109
. await
144
110
. inspect_err ( |e| {
@@ -147,7 +113,14 @@ impl HttpClient {
147
113
. tokens
148
114
. unwrap ( )
149
115
. session_token ;
150
- Ok ( ( ) )
116
+
117
+ Ok ( Self {
118
+ client,
119
+ session_token,
120
+ session : None ,
121
+ debug : false ,
122
+ port,
123
+ } )
151
124
}
152
125
153
126
pub async fn query ( & mut self , sql : & str ) -> Result < DBOutput < DefaultColumnType > > {
@@ -204,43 +177,43 @@ impl HttpClient {
204
177
}
205
178
206
179
// Send request and get response by json format
207
- async fn post_query ( & mut self , sql : & str , url : & str ) -> Result < QueryResponse > {
180
+ async fn post_query ( & self , sql : & str , url : & str ) -> Result < QueryResponse > {
208
181
let mut query = HashMap :: new ( ) ;
209
182
query. insert ( "sql" , serde_json:: to_value ( sql) ?) ;
210
183
if let Some ( session) = & self . session {
211
184
query. insert ( "session" , serde_json:: to_value ( session) ?) ;
212
185
}
213
- let response = self
186
+ Ok ( self
214
187
. client
215
188
. post ( url)
216
- . headers ( self . session_headers . clone ( ) )
217
189
. json ( & query)
218
190
. bearer_auth ( & self . session_token )
219
191
. send ( )
220
192
. await
221
193
. inspect_err ( |e| {
222
194
println ! ( "fail to send to {}: {:?}" , url, e) ;
223
- } ) ?;
224
- let response = self . update_session_header ( response) . await ?;
225
- Ok ( response. json :: < QueryResponse > ( ) . await . inspect_err ( |e| {
226
- println ! ( "fail to decode json when call {}: {:?}" , url, e) ;
227
- } ) ?)
195
+ } ) ?
196
+ . json :: < QueryResponse > ( )
197
+ . await
198
+ . inspect_err ( |e| {
199
+ println ! ( "fail to decode json when call {}: {:?}" , url, e) ;
200
+ } ) ?)
228
201
}
229
202
230
- async fn poll_query_result ( & mut self , url : & str ) -> Result < QueryResponse > {
231
- let response = self
203
+ async fn poll_query_result ( & self , url : & str ) -> Result < QueryResponse > {
204
+ Ok ( self
232
205
. client
233
206
. get ( url)
234
207
. bearer_auth ( & self . session_token )
235
- . headers ( self . session_headers . clone ( ) )
236
208
. send ( )
237
209
. await
238
210
. inspect_err ( |e| {
239
211
println ! ( "fail to send to {}: {:?}" , url, e) ;
240
- } ) ?;
241
- let response = self . update_session_header ( response) . await ?;
242
- Ok ( response. json :: < QueryResponse > ( ) . await . inspect_err ( |e| {
243
- println ! ( "fail to decode json when call {}: {:?}" , url, e) ;
244
- } ) ?)
212
+ } ) ?
213
+ . json :: < QueryResponse > ( )
214
+ . await
215
+ . inspect_err ( |e| {
216
+ println ! ( "fail to decode json when call {}: {:?}" , url, e) ;
217
+ } ) ?)
245
218
}
246
219
}
0 commit comments