33import  java .util .ArrayList ;
44import  java .util .Collection ;
55import  java .util .List ;
6+ import  java .util .Map ;
67import  java .util .Set ;
78import  java .util .concurrent .atomic .AtomicBoolean ;
89import  java .util .concurrent .locks .Lock ;
2627import  redis .clients .jedis .util .IOUtils ;
2728
2829public  class  SentineledConnectionProvider  implements  ConnectionProvider  {
30+   class  PoolInfo  {
31+     public  String  host ;
32+     public  ConnectionPool  pool ;
33+ 
34+     public  PoolInfo (String  host , ConnectionPool  pool ) {
35+       this .host  = host ;
36+       this .pool  = pool ;
37+     }
38+   }
2939
3040  private  static  final  Logger  LOG  = LoggerFactory .getLogger (SentineledConnectionProvider .class );
3141
@@ -51,6 +61,10 @@ public class SentineledConnectionProvider implements ConnectionProvider {
5161
5262  private  final  Lock  initPoolLock  = new  ReentrantLock (true );
5363
64+   private  final  List <PoolInfo > slavePools  = new  ArrayList <>();
65+ 
66+   private  int  poolIndex ;
67+ 
5468  public  SentineledConnectionProvider (String  masterName , final  JedisClientConfig  masterClientConfig ,
5569      Set <HostAndPort > sentinels , final  JedisClientConfig  sentinelClientConfig ) {
5670    this (masterName , masterClientConfig , null , null , sentinels , sentinelClientConfig );
@@ -102,13 +116,52 @@ public SentineledConnectionProvider(String masterName, final JedisClientConfig m
102116    initMaster (master );
103117  }
104118
119+   private  Connection  getSlaveResource () {
120+     int  startIdx ;
121+     synchronized  (slavePools ) {
122+       poolIndex ++;
123+       if  (poolIndex  >= slavePools .size ()) {
124+         poolIndex  = 0 ;
125+       }
126+       startIdx  = poolIndex ;
127+     }
128+     return  _getSlaveResource (startIdx , 0 );
129+   }
130+ 
131+   private  Connection  _getSlaveResource (int  idx , int  cnt ) {
132+     PoolInfo  poolInfo ;
133+     synchronized  (slavePools ) {
134+       if  (cnt  >= slavePools .size ()) {
135+         return  null ;
136+       }
137+       poolInfo  = slavePools .get (idx  % slavePools .size ());
138+     }
139+     try  {
140+       Connection  jedis  = poolInfo .pool .getResource ();
141+       return  jedis ;
142+     } catch  (Exception  e ) {
143+       LOG .error ("get connection fail:" , e );
144+       return  _getSlaveResource (idx  + 1 , cnt  + 1 );
145+     }
146+   }
147+ 
105148  @ Override 
106149  public  Connection  getConnection () {
107150    return  pool .getResource ();
108151  }
109152
110153  @ Override 
111154  public  Connection  getConnection (CommandArguments  args ) {
155+     boolean  readCommand  = masterClientConfig .isReadCommand (args );
156+     if  (readCommand ) {
157+       Connection  slaveConn  = getSlaveResource ();
158+       if  (slaveConn  != null ) {
159+         return  slaveConn ;
160+       }
161+       if  (!masterClientConfig .isFallbackToMaster ()) {
162+         throw  new  JedisException ("can not get Connection, all slave is invalid" );
163+       }
164+     }
112165    return  pool .getResource ();
113166  }
114167
@@ -117,6 +170,10 @@ public void close() {
117170    sentinelListeners .forEach (SentinelListener ::shutdown );
118171
119172    pool .close ();
173+ 
174+     for  (PoolInfo  slavePool  : slavePools ) {
175+       slavePool .pool .close ();
176+     }
120177  }
121178
122179  public  HostAndPort  getCurrentMaster () {
@@ -167,6 +224,79 @@ private ConnectionPool createNodePool(HostAndPort master) {
167224    }
168225  }
169226
227+   private  void  initSlaves (List <HostAndPort > slaves ) {
228+     List <PoolInfo > removedSlavePools  = new  ArrayList <>();
229+     try  {
230+       synchronized  (slavePools ) {
231+         Loop :
232+         for  (int  i  = slavePools .size ()-1 ; i  >= 0 ; i --) {
233+           PoolInfo  poolInfo  = slavePools .get (i );
234+           for  (HostAndPort  slave  : slaves ) {
235+             String  host  = slave .toString ();
236+             if  (poolInfo .host .equals (host )) {
237+               continue  Loop ;
238+             }
239+           }
240+           removedSlavePools .add (slavePools .remove (i ));
241+         }
242+ 
243+         for  (HostAndPort  slave  : slaves ) {
244+           addSlave (slave );
245+         }
246+       }
247+     } finally  {
248+       if  (!removedSlavePools .isEmpty () && clientSideCache  != null ) {
249+         clientSideCache .flush ();
250+       }
251+ 
252+       for  (PoolInfo  removedSlavePool  : removedSlavePools ) {
253+         removedSlavePool .pool .destroy ();
254+       }
255+     }
256+   }
257+ 
258+   private  static  boolean  isHealthy (String  flags ) {
259+     for  (String  flag  : flags .split ("," )) {
260+       switch  (flag .trim ()) {
261+         case  "s_down" :
262+         case  "o_down" :
263+         case  "disconnected" :
264+           return  false ;
265+       }
266+     }
267+     return  true ;
268+   }
269+ 
270+   private  void  addSlave (HostAndPort  slave ) {
271+     String  newSlaveHost  = slave .toString ();
272+     synchronized  (this .slavePools ) {
273+       for  (int  i  = 0 ; i  < this .slavePools .size (); i ++) {
274+         PoolInfo  poolInfo  = this .slavePools .get (i );
275+         if  (poolInfo .host .equals (newSlaveHost )) {
276+           return ;
277+         }
278+       }
279+       slavePools .add (new  PoolInfo (newSlaveHost , createNodePool (slave )));
280+     }
281+   }
282+ 
283+   private  void  removeSlave (HostAndPort  slave ) {
284+     String  newSlaveHost  = slave .toString ();
285+     PoolInfo  removed  = null ;
286+     synchronized  (this .slavePools ) {
287+       for  (int  i  = 0 ; i  < this .slavePools .size (); i ++) {
288+         PoolInfo  poolInfo  = this .slavePools .get (i );
289+         if  (poolInfo .host .equals (newSlaveHost )) {
290+           removed  = slavePools .remove (i );
291+           break ;
292+         }
293+       }
294+     }
295+     if  (removed  != null ) {
296+       removed .pool .destroy ();
297+     }
298+   }
299+ 
170300  private  HostAndPort  initSentinels (Set <HostAndPort > sentinels ) {
171301
172302    HostAndPort  master  = null ;
@@ -262,6 +392,24 @@ public void run() {
262392
263393          sentinelJedis  = new  Jedis (node , sentinelClientConfig );
264394
395+           List <Map <String , String >> slaveInfos  = sentinelJedis .sentinelSlaves (masterName );
396+ 
397+           List <HostAndPort > slaves  = new  ArrayList <>();
398+ 
399+           for  (int  i  = 0 ; i  < slaveInfos .size (); i ++) {
400+             Map <String , String > slaveInfo  = slaveInfos .get (i );
401+             String  flags  = slaveInfo .get ("flags" );
402+             if  (flags  == null  || !isHealthy (flags )) {
403+               continue ;
404+             }
405+             String  ip  = slaveInfo .get ("ip" );
406+             int  port  = Integer .parseInt (slaveInfo .get ("port" ));
407+             HostAndPort  slave  = new  HostAndPort (ip , port );
408+             slaves .add (slave );
409+           }
410+ 
411+           initSlaves (slaves );
412+ 
265413          // code for active refresh 
266414          List <String > masterAddr  = sentinelJedis .sentinelGetMasterAddrByName (masterName );
267415          if  (masterAddr  == null  || masterAddr .size () != 2 ) {
@@ -275,24 +423,58 @@ public void run() {
275423            public  void  onMessage (String  channel , String  message ) {
276424              LOG .debug ("Sentinel {} published: {}." , node , message );
277425
278-               String [] switchMasterMsg  = message .split (" " );
279- 
280-               if  (switchMasterMsg .length  > 3 ) {
281- 
282-                 if  (masterName .equals (switchMasterMsg [0 ])) {
283-                   initMaster (toHostAndPort (switchMasterMsg [3 ], switchMasterMsg [4 ]));
284-                 } else  {
285-                   LOG .debug (
286-                       "Ignoring message on +switch-master for master {}. Our master is {}." ,
287-                       switchMasterMsg [0 ], masterName );
288-                 }
289- 
290-               } else  {
291-                 LOG .error ("Invalid message received on sentinel {} on channel +switch-master: {}." ,
292-                     node , message );
426+               String [] switchMsg  = message .split (" " );
427+               String  slaveIp ;
428+               int  slavePort ;
429+               switch  (channel ) {
430+                 case  "+switch-master" :
431+                   if  (switchMsg .length  > 3 ) {
432+                     if  (masterName .equals (switchMsg [0 ])) {
433+                       initMaster (toHostAndPort (switchMsg [3 ], switchMsg [4 ]));
434+                     } else  {
435+                       LOG .debug (
436+                               "Ignoring message on +switch-master for master {}. Our master is {}." ,
437+                               switchMsg [0 ], masterName );
438+                     }
439+                   } else  {
440+                     LOG .error ("Invalid message received on sentinel {} on channel +switch-master: {}." ,
441+                             node , message );
442+                   }
443+                   break ;
444+                 case  "+sdown" :
445+                   if  (switchMsg [0 ].equals ("master" )) {
446+                     return ;
447+                   }
448+                   if  (!masterName .equals (switchMsg [5 ])) {
449+                     return ;
450+                   }
451+                   slaveIp  = switchMsg [2 ];
452+                   slavePort  = Integer .parseInt (switchMsg [3 ]);
453+                   removeSlave (new  HostAndPort (slaveIp , slavePort ));
454+                   break ;
455+                 case  "-sdown" :
456+                   if  (!masterName .equals (switchMsg [5 ])) {
457+                     return ;
458+                   }
459+                   slaveIp  = switchMsg [2 ];
460+                   slavePort  = Integer .parseInt (switchMsg [3 ]);
461+                   addSlave (new  HostAndPort (slaveIp , slavePort ));
462+                   break ;
463+                 case  "+slave" :
464+                   if  (!masterName .equals (switchMsg [5 ])) {
465+                     return ;
466+                   }
467+                   slaveIp  = switchMsg [2 ];
468+                   slavePort  = Integer .parseInt (switchMsg [3 ]);
469+                   addSlave (new  HostAndPort (slaveIp , slavePort ));
470+ 
471+                   String  masterIp  = switchMsg [6 ];
472+                   int  masterPort  = Integer .parseInt (switchMsg [7 ]);
473+                   removeSlave (new  HostAndPort (masterIp , masterPort ));
474+                   break ;
293475              }
294476            }
295-           }, "+switch-master" );
477+           }, "+switch-master" ,  "+sdown" ,  "-sdown" ,  "+slave" );
296478
297479        } catch  (JedisException  e ) {
298480
0 commit comments