Skip to content

Commit af9f2f3

Browse files
committed
Remove 10min timeout from topic stream
1 parent 3706262 commit af9f2f3

File tree

1 file changed

+30
-3
lines changed

1 file changed

+30
-3
lines changed

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
from ..common.protos import ydb_topic_pb2, ydb_issue_message_pb2
3535

3636
from ... import issues, connection
37+
from ...settings import BaseRequestSettings
38+
39+
40+
DEFAULT_LONG_TIMEOUT = 31536000 # year
3741

3842

3943
class IFromProto(abc.ABC):
@@ -161,6 +165,13 @@ def __init__(self, convert_server_grpc_to_wrapper):
161165
self._stream_call = None
162166
self._wait_executor = None
163167

168+
self._stream_settings: BaseRequestSettings = (
169+
BaseRequestSettings()
170+
.with_operation_timeout(DEFAULT_LONG_TIMEOUT)
171+
.with_cancel_after(DEFAULT_LONG_TIMEOUT)
172+
.with_timeout(DEFAULT_LONG_TIMEOUT)
173+
)
174+
164175
def __del__(self):
165176
self._clean_executor(wait=False)
166177

@@ -188,6 +199,7 @@ async def _start_asyncio_driver(self, driver: DriverIO, stub, method):
188199
requests_iterator,
189200
stub,
190201
method,
202+
settings=self._stream_settings,
191203
)
192204
self._stream_call = stream_call
193205
self.from_server_grpc = stream_call.__aiter__()
@@ -196,14 +208,29 @@ async def _start_sync_driver(self, driver: Driver, stub, method):
196208
requests_iterator = AsyncQueueToSyncIteratorAsyncIO(self.from_client_grpc)
197209
self._wait_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
198210

199-
stream_call = await to_thread(driver, requests_iterator, stub, method, executor=self._wait_executor)
211+
stream_call = await to_thread(
212+
driver,
213+
requests_iterator,
214+
stub,
215+
method,
216+
executor=self._wait_executor,
217+
settings=self._stream_settings,
218+
)
200219
self._stream_call = stream_call
201220
self.from_server_grpc = SyncToAsyncIterator(stream_call.__iter__(), self._wait_executor)
202221

203-
async def receive(self) -> Any:
222+
async def receive(self, timeout: Optional[int] = None) -> Any:
204223
# todo handle grpc exceptions and convert it to internal exceptions
205224
try:
206-
grpc_message = await self.from_server_grpc.__anext__()
225+
if timeout is None:
226+
grpc_message = await self.from_server_grpc.__anext__()
227+
else:
228+
229+
async def get_response():
230+
return await self.from_server_grpc.__anext__()
231+
232+
grpc_message = await asyncio.wait_for(get_response(), timeout)
233+
207234
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
208235
raise connection._rpc_error_handler(self._connection_state, e)
209236

0 commit comments

Comments
 (0)