|
4 | 4 | "context"
|
5 | 5 | "fmt"
|
6 | 6 | "reflect"
|
| 7 | + "sync" |
7 | 8 | "time"
|
8 | 9 |
|
9 | 10 | "github.com/tarantool/go-tarantool/v2"
|
@@ -195,28 +196,84 @@ func SetInstanceRO(ctx context.Context, instance pool.Instance, isReplica bool)
|
195 | 196 |
|
196 | 197 | req := tarantool.NewCallRequest("box.cfg").
|
197 | 198 | Args([]interface{}{map[string]bool{"read_only": isReplica}})
|
198 |
| - if _, err := conn.Do(req).Get(); err != nil { |
199 |
| - return err |
| 199 | + if resp, err := conn.Do(req).Get(); err != nil { |
| 200 | + // return err |
| 201 | + return fmt.Errorf("%w: resp: %v", err, resp) |
200 | 202 | }
|
201 | 203 |
|
202 | 204 | return nil
|
203 | 205 | }
|
204 | 206 |
|
| 207 | +type initPoolHandler struct { |
| 208 | + roles map[string]pool.Role |
| 209 | + rolesMutex sync.Mutex |
| 210 | +} |
| 211 | + |
| 212 | +func (h *initPoolHandler) Discovered(name string, conn *tarantool.Connection, |
| 213 | + role pool.Role) error { |
| 214 | + h.rolesMutex.Lock() |
| 215 | + h.roles[name] = role |
| 216 | + h.rolesMutex.Unlock() |
| 217 | + return nil |
| 218 | +} |
| 219 | + |
| 220 | +func (h *initPoolHandler) Deactivated(name string, conn *tarantool.Connection, |
| 221 | + role pool.Role) error { |
| 222 | + h.rolesMutex.Lock() |
| 223 | + delete(h.roles, name) |
| 224 | + h.rolesMutex.Unlock() |
| 225 | + return nil |
| 226 | +} |
| 227 | + |
205 | 228 | func SetClusterRO(instances []pool.Instance, roles []bool) error {
|
206 | 229 | if len(instances) != len(roles) {
|
207 | 230 | return fmt.Errorf("number of instances should be equal to number of roles")
|
208 | 231 | }
|
209 | 232 |
|
| 233 | + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) |
| 234 | + defer cancel() |
| 235 | + |
210 | 236 | for i, instance := range instances {
|
211 |
| - ctx, cancel := GetConnectContext() |
212 | 237 | err := SetInstanceRO(ctx, instance, roles[i])
|
213 | 238 | cancel()
|
214 | 239 | if err != nil {
|
215 | 240 | return err
|
216 | 241 | }
|
217 | 242 | }
|
218 | 243 |
|
219 |
| - return nil |
| 244 | + // if ctx.Done() { |
| 245 | + // return nil, ctx.Err() |
| 246 | + // } |
| 247 | + |
| 248 | + h := &initPoolHandler{} |
| 249 | + initPoolOpts := pool.Opts{ |
| 250 | + CheckTimeout: 100 * time.Microsecond, |
| 251 | + ConnectionHandler: h, |
| 252 | + } |
| 253 | + pool, err := pool.ConnectWithOpts(ctx, instances, initPoolOpts) |
| 254 | + if err != nil { |
| 255 | + return err |
| 256 | + } |
| 257 | + defer pool.Close() |
| 258 | + |
| 259 | + // Prepare roles map that |
| 260 | + expectedRoles := make(map[string]bool, len(roles)) |
| 261 | + for i, role := range roles { |
| 262 | + expectedRoles[instances[i].Name] = role |
| 263 | + } |
| 264 | + |
| 265 | + // Wait for the roles to be applied. |
| 266 | + for i := 0; i < 100; i++ { |
| 267 | + h.rolesMutex.Lock() |
| 268 | + if reflect.DeepEqual(h.roles, expectedRoles) { |
| 269 | + h.rolesMutex.Unlock() |
| 270 | + return nil |
| 271 | + } |
| 272 | + h.rolesMutex.Unlock() |
| 273 | + |
| 274 | + time.Sleep(100 * time.Millisecond) |
| 275 | + } |
| 276 | + return fmt.Errorf("Failed to wait roles are applied") |
220 | 277 | }
|
221 | 278 |
|
222 | 279 | func StartTarantoolInstances(instsOpts []StartOpts) ([]*TarantoolInstance, error) {
|
|
0 commit comments