@@ -16,6 +16,7 @@ use tonic::Request;
1616use super :: timestamp:: TimestampOracle ;
1717use crate :: internal_err;
1818use crate :: proto:: pdpb;
19+ use crate :: Config ;
1920use crate :: Result ;
2021use crate :: SecurityManager ;
2122use crate :: Timestamp ;
@@ -103,13 +104,9 @@ impl Connection {
103104 Connection { security_mgr }
104105 }
105106
106- pub async fn connect_cluster (
107- & self ,
108- endpoints : & [ String ] ,
109- timeout : Duration ,
110- ) -> Result < Cluster > {
111- let members = self . validate_endpoints ( endpoints, timeout) . await ?;
112- let ( client, members) = self . try_connect_leader ( & members, timeout) . await ?;
107+ pub async fn connect_cluster ( & self , endpoints : & [ String ] , config : & Config ) -> Result < Cluster > {
108+ let members = self . validate_endpoints ( endpoints, config) . await ?;
109+ let ( client, members) = self . try_connect_leader ( & members, config) . await ?;
113110 let id = members. header . as_ref ( ) . unwrap ( ) . cluster_id ;
114111 let tso = TimestampOracle :: new ( id, & client) ?;
115112 let cluster = Cluster {
@@ -122,10 +119,10 @@ impl Connection {
122119 }
123120
124121 // Re-establish connection with PD leader in asynchronous fashion.
125- pub async fn reconnect ( & self , cluster : & mut Cluster , timeout : Duration ) -> Result < ( ) > {
122+ pub async fn reconnect ( & self , cluster : & mut Cluster , config : & Config ) -> Result < ( ) > {
126123 warn ! ( "updating pd client" ) ;
127124 let start = Instant :: now ( ) ;
128- let ( client, members) = self . try_connect_leader ( & cluster. members , timeout ) . await ?;
125+ let ( client, members) = self . try_connect_leader ( & cluster. members , config ) . await ?;
129126 let tso = TimestampOracle :: new ( cluster. id , & client) ?;
130127 * cluster = Cluster {
131128 id : cluster. id ,
@@ -141,7 +138,7 @@ impl Connection {
141138 async fn validate_endpoints (
142139 & self ,
143140 endpoints : & [ String ] ,
144- timeout : Duration ,
141+ config : & Config ,
145142 ) -> Result < pdpb:: GetMembersResponse > {
146143 let mut endpoints_set = HashSet :: with_capacity ( endpoints. len ( ) ) ;
147144
@@ -152,7 +149,7 @@ impl Connection {
152149 return Err ( internal_err ! ( "duplicated PD endpoint {}" , ep) ) ;
153150 }
154151
155- let ( _, resp) = match self . connect ( ep, timeout ) . await {
152+ let ( _, resp) = match self . connect ( ep, config ) . await {
156153 Ok ( resp) => resp,
157154 // Ignore failed PD node.
158155 Err ( e) => {
@@ -193,11 +190,11 @@ impl Connection {
193190 async fn connect (
194191 & self ,
195192 addr : & str ,
196- _timeout : Duration ,
193+ config : & Config ,
197194 ) -> Result < ( pdpb:: pd_client:: PdClient < Channel > , pdpb:: GetMembersResponse ) > {
198195 let mut client = self
199196 . security_mgr
200- . connect ( addr, pdpb:: pd_client:: PdClient :: < Channel > :: new)
197+ . connect ( addr, pdpb:: pd_client:: PdClient :: < Channel > :: new, config )
201198 . await ?;
202199 let resp: pdpb:: GetMembersResponse = client
203200 . get_members ( pdpb:: GetMembersRequest :: default ( ) )
@@ -210,9 +207,9 @@ impl Connection {
210207 & self ,
211208 addr : & str ,
212209 cluster_id : u64 ,
213- timeout : Duration ,
210+ config : & Config ,
214211 ) -> Result < ( pdpb:: pd_client:: PdClient < Channel > , pdpb:: GetMembersResponse ) > {
215- let ( client, r) = self . connect ( addr, timeout ) . await ?;
212+ let ( client, r) = self . connect ( addr, config ) . await ?;
216213 Connection :: validate_cluster_id ( addr, & r, cluster_id) ?;
217214 Ok ( ( client, r) )
218215 }
@@ -238,7 +235,7 @@ impl Connection {
238235 async fn try_connect_leader (
239236 & self ,
240237 previous : & pdpb:: GetMembersResponse ,
241- timeout : Duration ,
238+ config : & Config ,
242239 ) -> Result < ( pdpb:: pd_client:: PdClient < Channel > , pdpb:: GetMembersResponse ) > {
243240 let previous_leader = previous. leader . as_ref ( ) . unwrap ( ) ;
244241 let members = & previous. members ;
@@ -252,7 +249,7 @@ impl Connection {
252249 . chain ( Some ( previous_leader) )
253250 {
254251 for ep in & m. client_urls {
255- match self . try_connect ( ep. as_str ( ) , cluster_id, timeout ) . await {
252+ match self . try_connect ( ep. as_str ( ) , cluster_id, config ) . await {
256253 Ok ( ( _, r) ) => {
257254 resp = Some ( r) ;
258255 break ' outer;
@@ -269,7 +266,7 @@ impl Connection {
269266 if let Some ( resp) = resp {
270267 let leader = resp. leader . as_ref ( ) . unwrap ( ) ;
271268 for ep in & leader. client_urls {
272- let r = self . try_connect ( ep. as_str ( ) , cluster_id, timeout ) . await ;
269+ let r = self . try_connect ( ep. as_str ( ) , cluster_id, config ) . await ;
273270 if r. is_ok ( ) {
274271 return r;
275272 }
0 commit comments