4
4
"context"
5
5
"fmt"
6
6
"reflect"
7
+ "sync"
7
8
"time"
8
9
9
10
"github.com/tarantool/go-tarantool/v2"
@@ -195,28 +196,83 @@ func SetInstanceRO(ctx context.Context, instance pool.Instance, isReplica bool)
195
196
196
197
req := tarantool .NewCallRequest ("box.cfg" ).
197
198
Args ([]interface {}{map [string ]bool {"read_only" : isReplica }})
198
- if _ , err := conn .Do (req ).Get (); err != nil {
199
- return err
199
+ if resp , err := conn .Do (req ).Get (); err != nil {
200
+ // return err
201
+ return fmt .Errorf ("%w: resp: %v" , err , resp )
200
202
}
201
203
202
204
return nil
203
205
}
204
206
207
+ type initPoolHandler struct {
208
+ roles map [string ]pool.Role
209
+ rolesMutex sync.Mutex
210
+ }
211
+
212
+ func (h * initPoolHandler ) Discovered (name string , conn * tarantool.Connection ,
213
+ role pool.Role ) error {
214
+ h .rolesMutex .Lock ()
215
+ h .roles [name ] = role
216
+ h .rolesMutex .Unlock ()
217
+ return nil
218
+ }
219
+
220
+ func (h * initPoolHandler ) Deactivated (name string , conn * tarantool.Connection ,
221
+ role pool.Role ) error {
222
+ h .rolesMutex .Lock ()
223
+ delete (h .roles , name )
224
+ h .rolesMutex .Unlock ()
225
+ return nil
226
+ }
227
+
205
228
func SetClusterRO (instances []pool.Instance , roles []bool ) error {
206
229
if len (instances ) != len (roles ) {
207
230
return fmt .Errorf ("number of instances should be equal to number of roles" )
208
231
}
209
232
233
+ ctx , cancel := context .WithTimeout (context .Background (), 500 * time .Millisecond )
234
+
210
235
for i , instance := range instances {
211
- ctx , cancel := GetConnectContext ()
212
- err := SetInstanceRO (ctx , instance , roles [i ])
236
+ err := SetInstanceRO (ctx , instance .Dialer , instance .Opts , roles [i ])
213
237
cancel ()
214
238
if err != nil {
215
239
return err
216
240
}
217
241
}
218
242
219
- return nil
243
+ // if ctx.Done() {
244
+ // return nil, ctx.Err()
245
+ // }
246
+
247
+ h := & initPoolHandler {}
248
+ initPoolOpts := pool.Opts {
249
+ CheckTimeout : 100 * time .Microsecond ,
250
+ ConnectionHandler : h ,
251
+ }
252
+ pool , err := pool .ConnectWithOpts (ctx , instances , initPoolOpts )
253
+ if err != nil {
254
+ return err
255
+ }
256
+ defer pool .Close ()
257
+
258
+ // Prepare roles map that
259
+ expectedRoles := make (map [string ]bool , len (roles ))
260
+ for i , role := range roles {
261
+ expectedRoles [instances [i ].Name ] = role
262
+ }
263
+
264
+ // Wait for the roles to be applied.
265
+ for i := 0 ; i < 100 ; i ++ {
266
+ h .rolesMutex .Lock ()
267
+ if reflect .DeepEqual (h .roles , expectedRoles ) {
268
+ h .rolesMutex .Unlock ()
269
+ return nil
270
+ }
271
+ h .rolesMutex .Unlock ()
272
+
273
+ time .Sleep (100 * time .Millisecond )
274
+ }
275
+ return fmt .Errorf ("Failed to wait roles are applied" )
220
276
}
221
277
222
278
func StartTarantoolInstances (instsOpts []StartOpts ) ([]* TarantoolInstance , error ) {
@@ -242,5 +298,5 @@ func StopTarantoolInstances(instances []*TarantoolInstance) {
242
298
}
243
299
244
300
func GetPoolConnectContext () (context.Context , context.CancelFunc ) {
245
- return context .WithTimeout (context .Background (), 500 * time .Millisecond )
301
+ return context .WithTimeout (context .Background (), 1000 * time .Millisecond )
246
302
}
0 commit comments