@@ -130,34 +130,21 @@ impl KvbmLeader {
130
130
leader_sockets. pub_url . clone ( ) ,
131
131
leader_sockets. ack_url . clone ( ) ,
132
132
) ;
133
- leader. spawn_barrier_task (
134
- drt,
135
- leader_urls
136
- ) ;
133
+ leader. spawn_barrier_task ( drt, leader_urls) ;
137
134
leader. spawn_zmq_task ( leader_sockets, cancel_token) ;
138
135
139
136
Ok ( leader)
140
137
}
141
138
142
- fn spawn_barrier_task (
143
- & self ,
144
- drt : DistributedRuntime ,
145
- leader_urls : ( String , String ) ,
146
- ) {
139
+ fn spawn_barrier_task ( & self , drt : DistributedRuntime , leader_urls : ( String , String ) ) {
147
140
let state = self . state . clone ( ) ;
148
141
let leader_config = self . config . clone ( ) ;
149
142
let ready = Arc :: clone ( & self . workers_sync_ready ) ;
150
143
let notify = Arc :: clone ( & self . workers_sync_ready_notify ) ;
151
144
let done = Arc :: clone ( & self . workers_sync_done ) ;
152
145
153
146
tokio:: spawn ( async move {
154
- match KvbmLeader :: run_barrier_sync (
155
- drt,
156
- leader_urls,
157
- leader_config,
158
- )
159
- . await
160
- {
147
+ match KvbmLeader :: run_barrier_sync ( drt, leader_urls, leader_config) . await {
161
148
Ok ( ( num_device_blocks, num_host_blocks, num_disk_blocks) ) => {
162
149
// write back results
163
150
state
@@ -250,12 +237,11 @@ impl KvbmLeader {
250
237
num_disk_blocks,
251
238
} ) ;
252
239
253
- let leader_to_worker_barrier: LeaderBarrier < KvbmLeaderData , ( ) > =
254
- LeaderBarrier :: new (
255
- barrier_id_leader_to_worker. clone ( ) ,
256
- leader_config. world_size ,
257
- Some ( Duration :: from_secs ( leader_config. leader_init_timeout_secs ) ) ,
258
- ) ;
240
+ let leader_to_worker_barrier: LeaderBarrier < KvbmLeaderData , ( ) > = LeaderBarrier :: new (
241
+ barrier_id_leader_to_worker. clone ( ) ,
242
+ leader_config. world_size ,
243
+ Some ( Duration :: from_secs ( leader_config. leader_init_timeout_secs ) ) ,
244
+ ) ;
259
245
260
246
let _worker_data = leader_to_worker_barrier
261
247
. sync ( & drt, zmq_data_leader_to_worker. as_ref ( ) )
0 commit comments