@@ -15,6 +15,7 @@ import { createModuleLogger } from 'lib0/logging'
1515import  toobusy  from  'toobusy-js' 
1616import  {  promiseWithResolvers  }  from  './utils.js' 
1717import  {  ClientClosedError  }  from  'redis' 
18+ import  {  randomUUID  }  from  'crypto' 
1819
1920const  logSocketIO  =  createModuleLogger ( '@y/socket-io/server' ) 
2021const  PERSIST_INTERVAL  =  number . parseInt ( env . getConf ( 'y-socket-io-server-persist-interval' )  ||  '3000' ) 
@@ -24,6 +25,7 @@ const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true'
2425const  DEFAULT_CLEAR_TIMEOUT  =  number . parseInt ( env . getConf ( 'y-socket-io-default-clear-timeout' )  ||  '30000' ) 
2526const  WORKER_HEALTH_CHECK_INTERVAL  =  number . parseInt ( env . getConf ( 'y-socket-io-worker-health-check-interval' )  ||  '5000' ) 
2627const  NEVER_REJECT_CONNECTION  =  env . getConf ( 'y-socket-io-never-reject-connection' )  ===  'true' 
28+ const  PERSIST_LEADER_HEARTBEAT_INTERVAL  =  number . parseInt ( env . getConf ( 'y-socket-io-server-persist-leader-heartbeat-interval' )  ||  '5000' ) 
2729
2830process . on ( 'SIGINT' ,  function  ( )  { 
2931  // calling .shutdown allows your process to exit normally 
@@ -169,6 +171,21 @@ export class YSocketIO {
169171   * @private  
170172   */ 
171173  persistWorkerHealthCheckTimeout  =  null 
174+   /** 
175+    * @type  {NodeJS.Timeout | null } 
176+    * @private  
177+    */ 
178+   persistentHeartbeatTimeout  =  null 
179+   /** 
180+    * @type  {Set<string> } 
181+    * @private  
182+    */ 
183+   persistentLeaderOf  =  new  Set ( ) 
184+   /** 
185+    * @type  {string } 
186+    * @private  
187+    */ 
188+   serverId  =  randomUUID ( ) 
172189
173190  /** 
174191   * YSocketIO constructor. 
@@ -210,6 +227,8 @@ export class YSocketIO {
210227      this . registerPersistWorkerHealthCheck ( ) 
211228    } 
212229
230+     this . registerPersistentLeaderHeartbeat ( ) 
231+ 
213232    this . nsp  =  this . io . of ( / ^ \/ y j s \| .* $ / ) 
214233
215234    this . nsp . use ( async  ( socket ,  next )  =>  { 
@@ -286,6 +305,7 @@ export class YSocketIO {
286305          this . subscriber ?. ensureSubId ( stream ,  doc . redisLastId ) 
287306        } 
288307        this . startSynchronization ( socket ,  doc ) 
308+         await  this . tryAcquirePersistentLeader ( namespace ) 
289309      } ) ( ) 
290310    } ) 
291311
@@ -394,6 +414,7 @@ export class YSocketIO {
394414        if  ( nsp ?. sockets . size  ===  0  &&  stream )  { 
395415          this . cleanupNamespace ( ns ,  stream ,  DEFAULT_CLEAR_TIMEOUT ) 
396416          if  ( this . namespaceDocMap . has ( ns ) )  this . debouncedPersist ( ns ,  true ) 
417+           this . persistentLeaderOf . delete ( ns ) 
397418        } 
398419        logSocketIO ( `disconnecting socket in ${ ns } ${ nsp ?. sockets . size  ||  0 }  ) 
399420      } 
@@ -540,10 +561,13 @@ export class YSocketIO {
540561        // are all synchronize operations 
541562        this . debouncedPersistMap . delete ( namespace ) 
542563
564+         const  isLeader  =  await  this . tryAcquirePersistentLeader ( namespace ) 
565+         if  ( ! isLeader )  return 
566+ 
543567        try  { 
544568          assert ( this . client ) 
545569          const  doc  =  this . namespaceDocMap . get ( namespace ) ?. ydoc 
546-           logSocketIO ( `trying to persist ${ namespace }  ) 
570+           logSocketIO ( `trying to persist ${ namespace }  in [SID:  ${ this . serverId } ] ` ) 
547571          if  ( ! doc )  return 
548572          if  ( this . persistWorker  &&  this . workerReady )  { 
549573            /** @type  {ReturnType<typeof promiseWithResolvers<void>> } */ 
@@ -659,6 +683,9 @@ export class YSocketIO {
659683      if  ( this . persistWorkerHealthCheckTimeout )  { 
660684        clearInterval ( this . persistWorkerHealthCheckTimeout ) 
661685      } 
686+       if  ( this . persistentHeartbeatTimeout )  { 
687+         clearTimeout ( this . persistentHeartbeatTimeout ) 
688+       } 
662689      this . subscriber ?. destroy ( ) 
663690      return  this . client ?. destroy ( ) 
664691    }  catch  ( e )  { 
@@ -767,4 +794,78 @@ export class YSocketIO {
767794    } 
768795    return  health 
769796  } 
797+ 
798+   /** 
799+    * @param  {string } namespace 
800+   */ 
801+   getLeaderKeyOf  ( namespace )  { 
802+     assert ( this . client ) 
803+     return  `${ this . client . prefix } ${ namespace }  
804+   } 
805+ 
806+   async  registerPersistentLeaderHeartbeat  ( )  { 
807+     this . persistentHeartbeatTimeout  =  setTimeout ( async  ( )  =>  { 
808+       assert ( this . client ) 
809+       const  redis  =  this . client . redis 
810+ 
811+       try  { 
812+         /** 
813+          * @type  {Array<Promise<any>> } 
814+          */ 
815+         const  promises  =  [ ] 
816+         for  ( const  namespace  of  this . persistentLeaderOf )  { 
817+           const  key  =  this . getLeaderKeyOf ( namespace ) 
818+           const  curLeader  =  await  redis . get ( key ) 
819+ 
820+           // remove orphaned if exist 
821+           const  aliveClients  =  this . namespaceMap . get ( namespace ) ?. sockets . size  ||  0 
822+           if  ( aliveClients  ===  0 )  { 
823+             logSocketIO ( `clearing leader heartbeat for [${ namespace } ${ this . serverId }  ) 
824+             this . persistentLeaderOf . delete ( namespace ) 
825+             continue 
826+           } 
827+ 
828+           if  ( curLeader  ===  this . serverId )  { 
829+             logSocketIO ( `set leader heartbeat for [${ namespace } ${ this . serverId }  ) 
830+             promises . push ( 
831+               redis . set ( key ,  this . serverId ,  { 
832+                 XX : true , 
833+                 PX : PERSIST_LEADER_HEARTBEAT_INTERVAL 
834+               } ) 
835+             ) 
836+           }  else  { 
837+             logSocketIO ( `lost leadership for [${ namespace } ${ this . serverId }  ) 
838+             this . persistentLeaderOf . delete ( namespace ) 
839+           } 
840+         } 
841+ 
842+         await  promise . all ( promises ) 
843+       }  catch  ( e )  { 
844+         console . error ( e ) 
845+       } 
846+ 
847+       // register next round 
848+       this . persistentHeartbeatTimeout  =  setTimeout ( 
849+         ( )  =>  this . registerPersistentLeaderHeartbeat ( ) , 
850+         PERSIST_LEADER_HEARTBEAT_INTERVAL  /  2 
851+       ) 
852+     } ) 
853+   } 
854+ 
855+   /** 
856+    * @param  {string } namespace 
857+    */ 
858+   async  tryAcquirePersistentLeader  ( namespace )  { 
859+     assert ( this . client ) 
860+     const  redis  =  this . client . redis 
861+     const  key  =  this . getLeaderKeyOf ( namespace ) 
862+     const  ok  =  await  redis . set ( key ,  this . serverId ,  { 
863+       NX : true , 
864+       PX : PERSIST_LEADER_HEARTBEAT_INTERVAL 
865+     } ) 
866+     if  ( ! ok )  return  false 
867+ 
868+     this . persistentLeaderOf . add ( namespace ) 
869+     return  true 
870+   } 
770871} 
0 commit comments