Skip to content

Commit be5055f

Browse files
committed
Remove 10min timeout from topic stream
1 parent 3706262 commit be5055f

File tree

9 files changed

+151
-18
lines changed

9 files changed

+151
-18
lines changed

ydb/_constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
DEFAULT_INITIAL_RESPONSE_TIMEOUT = 600
2+
DEFAULT_LONG_STREAM_TIMEOUT = 31536000 # year

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
from ..common.protos import ydb_topic_pb2, ydb_issue_message_pb2
3535

3636
from ... import issues, connection
37+
from ...settings import BaseRequestSettings
38+
from ..._constants import DEFAULT_LONG_STREAM_TIMEOUT
3739

3840

3941
class IFromProto(abc.ABC):
@@ -131,7 +133,7 @@ async def __anext__(self):
131133

132134
class IGrpcWrapperAsyncIO(abc.ABC):
133135
@abc.abstractmethod
134-
async def receive(self) -> Any:
136+
async def receive(self, timeout: Optional[int] = None) -> Any:
135137
...
136138

137139
@abc.abstractmethod
@@ -161,6 +163,13 @@ def __init__(self, convert_server_grpc_to_wrapper):
161163
self._stream_call = None
162164
self._wait_executor = None
163165

166+
self._stream_settings: BaseRequestSettings = (
167+
BaseRequestSettings()
168+
.with_operation_timeout(DEFAULT_LONG_STREAM_TIMEOUT)
169+
.with_cancel_after(DEFAULT_LONG_STREAM_TIMEOUT)
170+
.with_timeout(DEFAULT_LONG_STREAM_TIMEOUT)
171+
)
172+
164173
def __del__(self):
165174
self._clean_executor(wait=False)
166175

@@ -188,6 +197,7 @@ async def _start_asyncio_driver(self, driver: DriverIO, stub, method):
188197
requests_iterator,
189198
stub,
190199
method,
200+
settings=self._stream_settings,
191201
)
192202
self._stream_call = stream_call
193203
self.from_server_grpc = stream_call.__aiter__()
@@ -196,14 +206,29 @@ async def _start_sync_driver(self, driver: Driver, stub, method):
196206
requests_iterator = AsyncQueueToSyncIteratorAsyncIO(self.from_client_grpc)
197207
self._wait_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
198208

199-
stream_call = await to_thread(driver, requests_iterator, stub, method, executor=self._wait_executor)
209+
stream_call = await to_thread(
210+
driver,
211+
requests_iterator,
212+
stub,
213+
method,
214+
executor=self._wait_executor,
215+
settings=self._stream_settings,
216+
)
200217
self._stream_call = stream_call
201218
self.from_server_grpc = SyncToAsyncIterator(stream_call.__iter__(), self._wait_executor)
202219

203-
async def receive(self) -> Any:
220+
async def receive(self, timeout: Optional[int] = None) -> Any:
204221
# todo handle grpc exceptions and convert it to internal exceptions
205222
try:
206-
grpc_message = await self.from_server_grpc.__anext__()
223+
if timeout is None:
224+
grpc_message = await self.from_server_grpc.__anext__()
225+
else:
226+
227+
async def get_response():
228+
return await self.from_server_grpc.__anext__()
229+
230+
grpc_message = await asyncio.wait_for(get_response(), timeout)
231+
207232
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
208233
raise connection._rpc_error_handler(self._connection_state, e)
209234

ydb/_topic_common/test_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def __init__(self):
1515
self.from_client = asyncio.Queue()
1616
self._closed = False
1717

18-
async def receive(self) -> typing.Any:
18+
async def receive(self, timeout: typing.Optional[int] = None) -> typing.Any:
1919
if self._closed:
2020
raise Exception("read from closed StreamMock")
2121

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
if typing.TYPE_CHECKING:
3939
from ..query.transaction import BaseQueryTxContext
4040

41+
from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT
42+
4143
logger = logging.getLogger(__name__)
4244

4345

@@ -490,7 +492,13 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
490492
logger.debug("reader stream %s send init request", self._id)
491493

492494
stream.write(StreamReadMessage.FromClient(client_message=init_message))
493-
init_response = await stream.receive() # type: StreamReadMessage.FromServer
495+
try:
496+
init_response = await stream.receive(
497+
timeout=DEFAULT_INITIAL_RESPONSE_TIMEOUT
498+
) # type: StreamReadMessage.FromServer
499+
except asyncio.TimeoutError:
500+
raise TopicReaderError("Timeout waiting for init response")
501+
494502
if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
495503
self._session_id = init_response.server_message.session_id
496504
logger.debug("reader stream %s initialized session=%s", self._id, self._session_id)

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from . import datatypes, topic_reader_asyncio
1515
from .datatypes import PublicBatch, PublicMessage
1616
from .topic_reader import PublicReaderSettings
17-
from .topic_reader_asyncio import ReaderStream, ReaderReconnector
17+
from .topic_reader_asyncio import ReaderStream, ReaderReconnector, TopicReaderError
1818
from .._grpc.grpcwrapper.common_utils import SupportedDriverType, ServerStatus
1919
from .._grpc.grpcwrapper.ydb_topic import (
2020
StreamReadMessage,
@@ -36,6 +36,8 @@
3636
else:
3737
from .._grpc.common.protos import ydb_status_codes_pb2
3838

39+
from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT
40+
3941

4042
@pytest.fixture(autouse=True)
4143
def handle_exceptions(event_loop):
@@ -1475,6 +1477,46 @@ def logged():
14751477

14761478
await wait_condition(logged)
14771479

1480+
async def test_init_timeout_parameter(self, stream, default_reader_settings):
1481+
"""Test that ReaderStream._start calls stream.receive with timeout=10"""
1482+
reader = ReaderStream(self.default_reader_reconnector_id, default_reader_settings)
1483+
init_message = default_reader_settings._init_message()
1484+
1485+
# Mock stream.receive to check if timeout is passed
1486+
with mock.patch.object(stream, "receive") as mock_receive:
1487+
mock_receive.return_value = StreamReadMessage.FromServer(
1488+
server_status=ServerStatus(ydb_status_codes_pb2.StatusIds.SUCCESS, []),
1489+
server_message=StreamReadMessage.InitResponse(session_id="test_session"),
1490+
)
1491+
1492+
await reader._start(stream, init_message)
1493+
1494+
# Verify that receive was called with timeout
1495+
mock_receive.assert_called_with(timeout=DEFAULT_INITIAL_RESPONSE_TIMEOUT)
1496+
1497+
await reader.close(False)
1498+
1499+
async def test_init_timeout_behavior(self, stream, default_reader_settings):
1500+
"""Test that ReaderStream._start raises TopicReaderError when receive times out"""
1501+
reader = ReaderStream(self.default_reader_reconnector_id, default_reader_settings)
1502+
init_message = default_reader_settings._init_message()
1503+
1504+
# Mock stream.receive to directly raise TimeoutError when called with timeout
1505+
async def timeout_receive(timeout=None):
1506+
if timeout == DEFAULT_INITIAL_RESPONSE_TIMEOUT:
1507+
raise asyncio.TimeoutError("Simulated timeout")
1508+
return StreamReadMessage.FromServer(
1509+
server_status=ServerStatus(ydb_status_codes_pb2.StatusIds.SUCCESS, []),
1510+
server_message=StreamReadMessage.InitResponse(session_id="test_session"),
1511+
)
1512+
1513+
with mock.patch.object(stream, "receive", side_effect=timeout_receive):
1514+
# Should raise TopicReaderError with timeout message
1515+
with pytest.raises(TopicReaderError, match="Timeout waiting for init response"):
1516+
await reader._start(stream, init_message)
1517+
1518+
await reader.close(False)
1519+
14781520

14791521
@pytest.mark.asyncio
14801522
class TestReaderReconnector:

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
if typing.TYPE_CHECKING:
5050
from ..query.transaction import BaseQueryTxContext
5151

52+
from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT
53+
5254
logger = logging.getLogger(__name__)
5355

5456

@@ -799,7 +801,11 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes
799801
logger.debug("writer stream %s send init request", self._id)
800802
stream.write(StreamWriteMessage.FromClient(init_message))
801803

802-
resp = await stream.receive()
804+
try:
805+
resp = await stream.receive(timeout=DEFAULT_INITIAL_RESPONSE_TIMEOUT)
806+
except asyncio.TimeoutError:
807+
raise TopicWriterError("Timeout waiting for init response")
808+
803809
self._ensure_ok(resp)
804810
if not isinstance(resp, StreamWriteMessage.InitResponse):
805811
raise TopicWriterError("Unexpected answer for init request: %s" % resp)

ydb/_topic_writer/topic_writer_asyncio_test.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343

4444
from ..credentials import AnonymousCredentials
4545

46+
from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT
47+
4648

4749
FAKE_TRANSACTION_IDENTITY = TransactionIdentity(
4850
tx_id="transaction_id",
@@ -231,6 +233,55 @@ async def test_update_token(self, stream: StreamMock):
231233

232234
await writer.close()
233235

236+
async def test_init_timeout_parameter(self, stream):
237+
"""Test that WriterAsyncIOStream._start calls stream.receive with timeout=10"""
238+
writer_id = 1
239+
settings = WriterSettings(PublicWriterSettings("test-topic", "test-producer"))
240+
241+
# Mock stream.receive to check if timeout is passed
242+
with mock.patch.object(stream, "receive") as mock_receive:
243+
mock_receive.return_value = StreamWriteMessage.InitResponse(
244+
last_seq_no=0,
245+
session_id="test_session",
246+
partition_id=1,
247+
supported_codecs=[Codec.CODEC_RAW],
248+
status=ServerStatus(StatusCode.SUCCESS, []),
249+
)
250+
251+
writer = WriterAsyncIOStream(writer_id, settings)
252+
await writer._start(stream, settings.create_init_request())
253+
254+
# Verify that receive was called with timeout
255+
mock_receive.assert_called_with(timeout=DEFAULT_INITIAL_RESPONSE_TIMEOUT)
256+
257+
await writer.close()
258+
259+
async def test_init_timeout_behavior(self, stream):
260+
"""Test that WriterAsyncIOStream._start raises TopicWriterError when receive times out"""
261+
writer_id = 1
262+
settings = WriterSettings(PublicWriterSettings("test-topic", "test-producer"))
263+
264+
# Mock stream.receive to directly raise TimeoutError when called with timeout
265+
async def timeout_receive(timeout=None):
266+
if timeout == DEFAULT_INITIAL_RESPONSE_TIMEOUT:
267+
raise asyncio.TimeoutError("Simulated timeout")
268+
return StreamWriteMessage.InitResponse(
269+
last_seq_no=0,
270+
session_id="test_session",
271+
partition_id=1,
272+
supported_codecs=[Codec.CODEC_RAW],
273+
status=ServerStatus(StatusCode.SUCCESS, []),
274+
)
275+
276+
with mock.patch.object(stream, "receive", side_effect=timeout_receive):
277+
writer = WriterAsyncIOStream(writer_id, settings)
278+
279+
# Should raise TopicWriterError with timeout message
280+
with pytest.raises(TopicWriterError, match="Timeout waiting for init response"):
281+
await writer._start(stream, settings.create_init_request())
282+
283+
# Don't close writer since _start failed and _stream was never set
284+
234285

235286
@pytest.mark.asyncio
236287
class TestWriterAsyncIOReconnector:

ydb/aio/query/session.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
from ...query import base
1616
from ...query.session import (
1717
BaseQuerySession,
18-
DEFAULT_ATTACH_FIRST_RESP_TIMEOUT,
1918
QuerySessionStateEnum,
2019
)
2120

21+
from ..._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT
22+
2223

2324
class QuerySession(BaseQuerySession):
2425
"""Session object for Query Service. It is not recommended to control
@@ -47,7 +48,7 @@ async def _attach(self) -> None:
4748
try:
4849
first_response = await _utilities.get_first_message_with_timeout(
4950
self._status_stream,
50-
DEFAULT_ATTACH_FIRST_RESP_TIMEOUT,
51+
DEFAULT_INITIAL_RESPONSE_TIMEOUT,
5152
)
5253
if first_response.status != issues.StatusCode.SUCCESS:
5354
raise RuntimeError("Failed to attach session")

ydb/query/session.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818

1919
from .transaction import QueryTxContext
2020

21-
22-
logger = logging.getLogger(__name__)
21+
from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT, DEFAULT_LONG_STREAM_TIMEOUT
2322

2423

25-
DEFAULT_ATTACH_FIRST_RESP_TIMEOUT = 600
26-
DEFAULT_ATTACH_LONG_TIMEOUT = 31536000 # year
24+
logger = logging.getLogger(__name__)
2725

2826

2927
class QuerySessionStateEnum(enum.Enum):
@@ -142,9 +140,9 @@ def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[
142140
self._state = QuerySessionState(settings)
143141
self._attach_settings: BaseRequestSettings = (
144142
BaseRequestSettings()
145-
.with_operation_timeout(DEFAULT_ATTACH_LONG_TIMEOUT)
146-
.with_cancel_after(DEFAULT_ATTACH_LONG_TIMEOUT)
147-
.with_timeout(DEFAULT_ATTACH_LONG_TIMEOUT)
143+
.with_operation_timeout(DEFAULT_LONG_STREAM_TIMEOUT)
144+
.with_cancel_after(DEFAULT_LONG_STREAM_TIMEOUT)
145+
.with_timeout(DEFAULT_LONG_STREAM_TIMEOUT)
148146
)
149147

150148
self._last_query_stats = None
@@ -233,7 +231,7 @@ class QuerySession(BaseQuerySession):
233231

234232
_stream = None
235233

236-
def _attach(self, first_resp_timeout: int = DEFAULT_ATTACH_FIRST_RESP_TIMEOUT) -> None:
234+
def _attach(self, first_resp_timeout: int = DEFAULT_INITIAL_RESPONSE_TIMEOUT) -> None:
237235
self._stream = self._attach_call()
238236
status_stream = _utilities.SyncResponseIterator(
239237
self._stream,

0 commit comments

Comments
 (0)