Skip to content

Commit dd613e4

Browse files
committed
Remove 10min timeout from topic stream
1 parent 3706262 commit dd613e4

File tree

6 files changed

+132
-8
lines changed

6 files changed

+132
-8
lines changed

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
from ..common.protos import ydb_topic_pb2, ydb_issue_message_pb2
3535

3636
from ... import issues, connection
37+
from ...settings import BaseRequestSettings
38+
39+
40+
DEFAULT_LONG_TIMEOUT = 31536000 # year
3741

3842

3943
class IFromProto(abc.ABC):
@@ -131,7 +135,7 @@ async def __anext__(self):
131135

132136
class IGrpcWrapperAsyncIO(abc.ABC):
133137
@abc.abstractmethod
134-
async def receive(self) -> Any:
138+
async def receive(self, timeout: Optional[int] = None) -> Any:
135139
...
136140

137141
@abc.abstractmethod
@@ -161,6 +165,13 @@ def __init__(self, convert_server_grpc_to_wrapper):
161165
self._stream_call = None
162166
self._wait_executor = None
163167

168+
self._stream_settings: BaseRequestSettings = (
169+
BaseRequestSettings()
170+
.with_operation_timeout(DEFAULT_LONG_TIMEOUT)
171+
.with_cancel_after(DEFAULT_LONG_TIMEOUT)
172+
.with_timeout(DEFAULT_LONG_TIMEOUT)
173+
)
174+
164175
def __del__(self):
165176
self._clean_executor(wait=False)
166177

@@ -188,6 +199,7 @@ async def _start_asyncio_driver(self, driver: DriverIO, stub, method):
188199
requests_iterator,
189200
stub,
190201
method,
202+
settings=self._stream_settings,
191203
)
192204
self._stream_call = stream_call
193205
self.from_server_grpc = stream_call.__aiter__()
@@ -196,14 +208,29 @@ async def _start_sync_driver(self, driver: Driver, stub, method):
196208
requests_iterator = AsyncQueueToSyncIteratorAsyncIO(self.from_client_grpc)
197209
self._wait_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
198210

199-
stream_call = await to_thread(driver, requests_iterator, stub, method, executor=self._wait_executor)
211+
stream_call = await to_thread(
212+
driver,
213+
requests_iterator,
214+
stub,
215+
method,
216+
executor=self._wait_executor,
217+
settings=self._stream_settings,
218+
)
200219
self._stream_call = stream_call
201220
self.from_server_grpc = SyncToAsyncIterator(stream_call.__iter__(), self._wait_executor)
202221

203-
async def receive(self) -> Any:
222+
async def receive(self, timeout: Optional[int] = None) -> Any:
204223
# todo handle grpc exceptions and convert it to internal exceptions
205224
try:
206-
grpc_message = await self.from_server_grpc.__anext__()
225+
if timeout is None:
226+
grpc_message = await self.from_server_grpc.__anext__()
227+
else:
228+
229+
async def get_response():
230+
return await self.from_server_grpc.__anext__()
231+
232+
grpc_message = await asyncio.wait_for(get_response(), timeout)
233+
207234
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
208235
raise connection._rpc_error_handler(self._connection_state, e)
209236

ydb/_topic_common/test_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def __init__(self):
1515
self.from_client = asyncio.Queue()
1616
self._closed = False
1717

18-
async def receive(self) -> typing.Any:
18+
async def receive(self, timeout: typing.Optional[int] = None) -> typing.Any:
1919
if self._closed:
2020
raise Exception("read from closed StreamMock")
2121

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,11 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
490490
logger.debug("reader stream %s send init request", self._id)
491491

492492
stream.write(StreamReadMessage.FromClient(client_message=init_message))
493-
init_response = await stream.receive() # type: StreamReadMessage.FromServer
493+
try:
494+
init_response = await stream.receive(timeout=10) # type: StreamReadMessage.FromServer
495+
except asyncio.TimeoutError:
496+
raise TopicReaderError("Timeout waiting for init response")
497+
494498
if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
495499
self._session_id = init_response.server_message.session_id
496500
logger.debug("reader stream %s initialized session=%s", self._id, self._session_id)

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from . import datatypes, topic_reader_asyncio
1515
from .datatypes import PublicBatch, PublicMessage
1616
from .topic_reader import PublicReaderSettings
17-
from .topic_reader_asyncio import ReaderStream, ReaderReconnector
17+
from .topic_reader_asyncio import ReaderStream, ReaderReconnector, TopicReaderError
1818
from .._grpc.grpcwrapper.common_utils import SupportedDriverType, ServerStatus
1919
from .._grpc.grpcwrapper.ydb_topic import (
2020
StreamReadMessage,
@@ -1475,6 +1475,46 @@ def logged():
14751475

14761476
await wait_condition(logged)
14771477

1478+
async def test_init_timeout_parameter(self, stream, default_reader_settings):
1479+
"""Test that ReaderStream._start calls stream.receive with timeout=10"""
1480+
reader = ReaderStream(self.default_reader_reconnector_id, default_reader_settings)
1481+
init_message = default_reader_settings._init_message()
1482+
1483+
# Mock stream.receive to check if timeout=10 is passed
1484+
with mock.patch.object(stream, 'receive') as mock_receive:
1485+
mock_receive.return_value = StreamReadMessage.FromServer(
1486+
server_status=ServerStatus(ydb_status_codes_pb2.StatusIds.SUCCESS, []),
1487+
server_message=StreamReadMessage.InitResponse(session_id="test_session")
1488+
)
1489+
1490+
await reader._start(stream, init_message)
1491+
1492+
# Verify that receive was called with timeout=10
1493+
mock_receive.assert_called_with(timeout=10)
1494+
1495+
await reader.close(False)
1496+
1497+
async def test_init_timeout_behavior(self, stream, default_reader_settings):
1498+
"""Test that ReaderStream._start raises TopicReaderError when receive times out"""
1499+
reader = ReaderStream(self.default_reader_reconnector_id, default_reader_settings)
1500+
init_message = default_reader_settings._init_message()
1501+
1502+
# Mock stream.receive to directly raise TimeoutError when called with timeout=10
1503+
async def timeout_receive(timeout=None):
1504+
if timeout == 10:
1505+
raise asyncio.TimeoutError("Simulated timeout")
1506+
return StreamReadMessage.FromServer(
1507+
server_status=ServerStatus(ydb_status_codes_pb2.StatusIds.SUCCESS, []),
1508+
server_message=StreamReadMessage.InitResponse(session_id="test_session")
1509+
)
1510+
1511+
with mock.patch.object(stream, 'receive', side_effect=timeout_receive):
1512+
# Should raise TopicReaderError with timeout message
1513+
with pytest.raises(TopicReaderError, match="Timeout waiting for init response"):
1514+
await reader._start(stream, init_message)
1515+
1516+
await reader.close(False)
1517+
14781518

14791519
@pytest.mark.asyncio
14801520
class TestReaderReconnector:

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,11 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes
799799
logger.debug("writer stream %s send init request", self._id)
800800
stream.write(StreamWriteMessage.FromClient(init_message))
801801

802-
resp = await stream.receive()
802+
try:
803+
resp = await stream.receive(timeout=10)
804+
except asyncio.TimeoutError:
805+
raise TopicWriterError("Timeout waiting for init response")
806+
803807
self._ensure_ok(resp)
804808
if not isinstance(resp, StreamWriteMessage.InitResponse):
805809
raise TopicWriterError("Unexpected answer for init request: %s" % resp)

ydb/_topic_writer/topic_writer_asyncio_test.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,55 @@ async def test_update_token(self, stream: StreamMock):
231231

232232
await writer.close()
233233

234+
async def test_init_timeout_parameter(self, stream):
235+
"""Test that WriterAsyncIOStream._start calls stream.receive with timeout=10"""
236+
writer_id = 1
237+
settings = WriterSettings(PublicWriterSettings("test-topic", "test-producer"))
238+
239+
# Mock stream.receive to check if timeout=10 is passed
240+
with mock.patch.object(stream, 'receive') as mock_receive:
241+
mock_receive.return_value = StreamWriteMessage.InitResponse(
242+
last_seq_no=0,
243+
session_id="test_session",
244+
partition_id=1,
245+
supported_codecs=[Codec.CODEC_RAW],
246+
status=ServerStatus(StatusCode.SUCCESS, [])
247+
)
248+
249+
writer = WriterAsyncIOStream(writer_id, settings)
250+
await writer._start(stream, settings.create_init_request())
251+
252+
# Verify that receive was called with timeout=10
253+
mock_receive.assert_called_with(timeout=10)
254+
255+
await writer.close()
256+
257+
async def test_init_timeout_behavior(self, stream):
258+
"""Test that WriterAsyncIOStream._start raises TopicWriterError when receive times out"""
259+
writer_id = 1
260+
settings = WriterSettings(PublicWriterSettings("test-topic", "test-producer"))
261+
262+
# Mock stream.receive to directly raise TimeoutError when called with timeout=10
263+
async def timeout_receive(timeout=None):
264+
if timeout == 10:
265+
raise asyncio.TimeoutError("Simulated timeout")
266+
return StreamWriteMessage.InitResponse(
267+
last_seq_no=0,
268+
session_id="test_session",
269+
partition_id=1,
270+
supported_codecs=[Codec.CODEC_RAW],
271+
status=ServerStatus(StatusCode.SUCCESS, [])
272+
)
273+
274+
with mock.patch.object(stream, 'receive', side_effect=timeout_receive):
275+
writer = WriterAsyncIOStream(writer_id, settings)
276+
277+
# Should raise TopicWriterError with timeout message
278+
with pytest.raises(TopicWriterError, match="Timeout waiting for init response"):
279+
await writer._start(stream, settings.create_init_request())
280+
281+
# Don't close writer since _start failed and _stream was never set
282+
234283

235284
@pytest.mark.asyncio
236285
class TestWriterAsyncIOReconnector:

0 commit comments

Comments
 (0)