diff --git a/tests/aio/query/test_query_session.py b/tests/aio/query/test_query_session.py index cbc233d6..4a81975f 100644 --- a/tests/aio/query/test_query_session.py +++ b/tests/aio/query/test_query_session.py @@ -7,9 +7,7 @@ from ydb.aio.query.session import QuerySession -def _check_session_state_empty(session: QuerySession): - assert session._state.session_id is None - assert session._state.node_id is None +def _check_session_not_attached(session: QuerySession): assert not session._state.attached @@ -22,13 +20,13 @@ def _check_session_state_full(session: QuerySession): class TestAsyncQuerySession: @pytest.mark.asyncio async def test_session_normal_lifecycle(self, session: QuerySession): - _check_session_state_empty(session) + _check_session_not_attached(session) await session.create() _check_session_state_full(session) await session.delete() - _check_session_state_empty(session) + _check_session_not_attached(session) @pytest.mark.asyncio async def test_second_create_do_nothing(self, session: QuerySession): diff --git a/tests/aio/query/test_query_transaction.py b/tests/aio/query/test_query_transaction.py index b2a8ef32..7958f386 100644 --- a/tests/aio/query/test_query_transaction.py +++ b/tests/aio/query/test_query_transaction.py @@ -30,6 +30,12 @@ async def test_tx_commit_before_begin(self, tx: QueryTxContext): await tx.commit() assert tx._tx_state._state == QueryTxStateEnum.COMMITTED + @pytest.mark.asyncio + async def test_tx_rollback_after_session_detached(self, tx: QueryTxContext): + async with tx: + await tx.begin() + await tx.session.delete() + @pytest.mark.asyncio async def test_tx_rollback_before_begin(self, tx: QueryTxContext): await tx.rollback() diff --git a/tests/query/test_query_session.py b/tests/query/test_query_session.py index f84c2061..d4b71bd0 100644 --- a/tests/query/test_query_session.py +++ b/tests/query/test_query_session.py @@ -11,9 +11,7 @@ from ydb.query.session import QuerySession -def _check_session_state_empty(session: QuerySession): - assert session._state.session_id is None - assert session._state.node_id is None +def _check_session_not_attached(session: QuerySession): assert not session._state.attached @@ -25,13 +23,13 @@ def _check_session_state_full(session: QuerySession): class TestQuerySession: def test_session_normal_lifecycle(self, session: QuerySession): - _check_session_state_empty(session) + _check_session_not_attached(session) session.create() _check_session_state_full(session) session.delete() - _check_session_state_empty(session) + _check_session_not_attached(session) def test_second_create_do_nothing(self, session: QuerySession): session.create() @@ -145,7 +143,7 @@ def cancel(self): assert "first response attach stream thread" not in thread_names assert "attach stream thread" not in thread_names - _check_session_state_empty(session) + _check_session_not_attached(session) @pytest.mark.parametrize( "stats_mode", diff --git a/tests/query/test_query_transaction.py b/tests/query/test_query_transaction.py index 77c7251b..9d9a438d 100644 --- a/tests/query/test_query_transaction.py +++ b/tests/query/test_query_transaction.py @@ -26,6 +26,11 @@ def test_tx_commit_before_begin(self, tx: QueryTxContext): tx.commit() assert tx._tx_state._state == QueryTxStateEnum.COMMITTED + def test_tx_rollback_after_session_detached(self, tx: QueryTxContext): + with tx: + tx.begin() + tx.session.delete() + def test_tx_rollback_before_begin(self, tx: QueryTxContext): tx.rollback() assert tx._tx_state._state == QueryTxStateEnum.ROLLBACKED diff --git a/ydb/aio/query/session.py b/ydb/aio/query/session.py index 01e6bb27..68b5ecd6 100644 --- a/ydb/aio/query/session.py +++ b/ydb/aio/query/session.py @@ -57,7 +57,7 @@ async def _attach(self) -> None: if first_response.status != issues.StatusCode.SUCCESS: raise RuntimeError("Failed to attach session") except Exception as e: - self._state.reset() + self._state.set_attached(False) self._status_stream.cancel() raise e @@ -70,11 +70,11 @@ async def _check_session_status_loop(self) -> None: try: async for status in self._status_stream: if status.status != issues.StatusCode.SUCCESS: - self._state.reset() + self._state.set_attached(False) self._state._change_state(QuerySessionStateEnum.CLOSED) except Exception: if not self._state._already_in(QuerySessionStateEnum.CLOSED): - self._state.reset() + self._state.set_attached(False) self._state._change_state(QuerySessionStateEnum.CLOSED) async def delete(self, settings: Optional[BaseRequestSettings] = None) -> None: diff --git a/ydb/aio/query/transaction.py b/ydb/aio/query/transaction.py index d8e99ef3..81d10b5d 100644 --- a/ydb/aio/query/transaction.py +++ b/ydb/aio/query/transaction.py @@ -52,6 +52,10 @@ async def __aexit__(self, *args, **kwargs): it is not finished explicitly """ await self._ensure_prev_stream_finished() + + if not self._session_state.attached: + return + if self._tx_state._state == QueryTxStateEnum.BEGINED and self._external_error is None: # It's strictly recommended to close transactions directly # by using commit_tx=True flag while executing statement or by diff --git a/ydb/query/base.py b/ydb/query/base.py index 524db6e4..5063986f 100644 --- a/ydb/query/base.py +++ b/ydb/query/base.py @@ -111,10 +111,6 @@ class IQuerySessionState(abc.ABC): def __init__(self, settings: Optional[QueryClientSettings] = None): pass - @abc.abstractmethod - def reset(self) -> None: - pass - @property @abc.abstractmethod def session_id(self) -> Optional[str]: @@ -210,7 +206,7 @@ def decorator(rpc_state, response_pb, session_state: IQuerySessionState, *args, try: return func(rpc_state, response_pb, session_state, *args, **kwargs) except issues.BadSession: - session_state.reset() + session_state.set_attached(False) raise return decorator diff --git a/ydb/query/session.py b/ydb/query/session.py index 3c0f7bc3..407d99b2 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -65,11 +65,6 @@ class QuerySessionState(base.IQuerySessionState): def __init__(self, settings: base.QueryClientSettings = None): self._settings = settings - def reset(self) -> None: - self._session_id = None - self._node_id = None - self._attached = False - @property def session_id(self) -> Optional[str]: return self._session_id @@ -129,7 +124,7 @@ def wrapper_delete_session( ) -> "BaseQuerySession": message = _ydb_query.DeleteSessionResponse.from_proto(response_pb) issues._process_response(message.status) - session_state.reset() + session_state.set_attached(False) session_state._change_state(QuerySessionStateEnum.CLOSED) return session @@ -257,7 +252,7 @@ def _attach(self, first_resp_timeout: int = DEFAULT_INITIAL_RESPONSE_TIMEOUT) -> if first_response.status != issues.StatusCode.SUCCESS: raise RuntimeError("Failed to attach session") except Exception as e: - self._state.reset() + self._state.set_attached(False) status_stream.cancel() raise e @@ -275,11 +270,11 @@ def _check_session_status_loop(self, status_stream: _utilities.SyncResponseItera try: for status in status_stream: if status.status != issues.StatusCode.SUCCESS: - self._state.reset() + self._state.set_attached(False) self._state._change_state(QuerySessionStateEnum.CLOSED) except Exception: if not self._state._already_in(QuerySessionStateEnum.CLOSED): - self._state.reset() + self._state.set_attached(False) self._state._change_state(QuerySessionStateEnum.CLOSED) def delete(self, settings: Optional[BaseRequestSettings] = None) -> None: diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index 05554296..cf86015b 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -377,6 +377,10 @@ def __exit__(self, *args, **kwargs): it is not finished explicitly """ self._ensure_prev_stream_finished() + + if not self._session_state.attached: + return + if self._tx_state._state == QueryTxStateEnum.BEGINED and self._external_error is None: # It's strictly recommended to close transactions directly # by using commit_tx=True flag while executing statement or by