11import asyncio
22import queue
33import threading
4+ import time
45from concurrent .futures import ThreadPoolExecutor
56from datetime import datetime
67from enum import Enum
@@ -54,7 +55,13 @@ class EventStream(EventStore):
5455 _thread_loops : dict [str , dict [str , asyncio .AbstractEventLoop ]]
5556 _write_page_cache : list [dict ]
5657
57- def __init__ (self , sid : str , file_store : FileStore , user_id : str | None = None ):
58+ def __init__ (
59+ self ,
60+ sid : str ,
61+ file_store : FileStore ,
62+ user_id : str | None = None ,
63+ max_delay_time : float = 0.5 ,
64+ ):
5865 super ().__init__ (sid , file_store , user_id )
5966 self ._stop_flag = threading .Event ()
6067 self ._queue : queue .Queue [Event ] = queue .Queue ()
@@ -68,6 +75,7 @@ def __init__(self, sid: str, file_store: FileStore, user_id: str | None = None):
6875 self ._lock = threading .Lock ()
6976 self .secrets = {}
7077 self ._write_page_cache = []
78+ self .max_delay_time = max_delay_time
7179
7280 def _init_thread_loop (self , subscriber_id : str , callback_id : str ) -> None :
7381 loop = asyncio .new_event_loop ()
@@ -230,6 +238,10 @@ async def _process_queue(self) -> None:
230238 try :
231239 event = self ._queue .get (timeout = 0.1 )
232240 except queue .Empty :
241+ if self .max_delay_time > 0 :
242+ # IMPORTANT: This is workaround when mutilple event stream active in the same time. Do not remove this.
243+ # More Info: https://www.notion.so/oraichain/20251206-Server-ch-m-sau-m-t-kho-ng-th-i-gian-ch-y-210248af329080b2b48dccca24cb91f1
244+ time .sleep (self .max_delay_time ) # noqa
233245 continue
234246
235247 # pass each event to each callback in order
0 commit comments