Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions tests/aio/query/test_query_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
from ydb.aio.query.session import QuerySession


def _check_session_state_empty(session: QuerySession):
assert session._state.session_id is None
assert session._state.node_id is None
def _check_session_not_attached(session: QuerySession):
assert not session._state.attached


Expand All @@ -22,13 +20,13 @@ def _check_session_state_full(session: QuerySession):
class TestAsyncQuerySession:
@pytest.mark.asyncio
async def test_session_normal_lifecycle(self, session: QuerySession):
_check_session_state_empty(session)
_check_session_not_attached(session)

await session.create()
_check_session_state_full(session)

await session.delete()
_check_session_state_empty(session)
_check_session_not_attached(session)

@pytest.mark.asyncio
async def test_second_create_do_nothing(self, session: QuerySession):
Expand Down
6 changes: 6 additions & 0 deletions tests/aio/query/test_query_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ async def test_tx_commit_before_begin(self, tx: QueryTxContext):
await tx.commit()
assert tx._tx_state._state == QueryTxStateEnum.COMMITTED

@pytest.mark.asyncio
async def test_tx_rollback_after_session_detached(self, tx: QueryTxContext):
async with tx:
await tx.begin()
await tx.session.delete()

@pytest.mark.asyncio
async def test_tx_rollback_before_begin(self, tx: QueryTxContext):
await tx.rollback()
Expand Down
10 changes: 4 additions & 6 deletions tests/query/test_query_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
from ydb.query.session import QuerySession


def _check_session_state_empty(session: QuerySession):
assert session._state.session_id is None
assert session._state.node_id is None
def _check_session_not_attached(session: QuerySession):
assert not session._state.attached


Expand All @@ -25,13 +23,13 @@ def _check_session_state_full(session: QuerySession):

class TestQuerySession:
def test_session_normal_lifecycle(self, session: QuerySession):
_check_session_state_empty(session)
_check_session_not_attached(session)

session.create()
_check_session_state_full(session)

session.delete()
_check_session_state_empty(session)
_check_session_not_attached(session)

def test_second_create_do_nothing(self, session: QuerySession):
session.create()
Expand Down Expand Up @@ -145,7 +143,7 @@ def cancel(self):
assert "first response attach stream thread" not in thread_names
assert "attach stream thread" not in thread_names

_check_session_state_empty(session)
_check_session_not_attached(session)

@pytest.mark.parametrize(
"stats_mode",
Expand Down
5 changes: 5 additions & 0 deletions tests/query/test_query_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ def test_tx_commit_before_begin(self, tx: QueryTxContext):
tx.commit()
assert tx._tx_state._state == QueryTxStateEnum.COMMITTED

def test_tx_rollback_after_session_detached(self, tx: QueryTxContext):
with tx:
tx.begin()
tx.session.delete()

def test_tx_rollback_before_begin(self, tx: QueryTxContext):
tx.rollback()
assert tx._tx_state._state == QueryTxStateEnum.ROLLBACKED
Expand Down
6 changes: 3 additions & 3 deletions ydb/aio/query/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async def _attach(self) -> None:
if first_response.status != issues.StatusCode.SUCCESS:
raise RuntimeError("Failed to attach session")
except Exception as e:
self._state.reset()
self._state.set_attached(False)
self._status_stream.cancel()
raise e

Expand All @@ -70,11 +70,11 @@ async def _check_session_status_loop(self) -> None:
try:
async for status in self._status_stream:
if status.status != issues.StatusCode.SUCCESS:
self._state.reset()
self._state.set_attached(False)
self._state._change_state(QuerySessionStateEnum.CLOSED)
except Exception:
if not self._state._already_in(QuerySessionStateEnum.CLOSED):
self._state.reset()
self._state.set_attached(False)
self._state._change_state(QuerySessionStateEnum.CLOSED)

async def delete(self, settings: Optional[BaseRequestSettings] = None) -> None:
Expand Down
4 changes: 4 additions & 0 deletions ydb/aio/query/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ async def __aexit__(self, *args, **kwargs):
it is not finished explicitly
"""
await self._ensure_prev_stream_finished()

if not self._session_state.attached:
return

if self._tx_state._state == QueryTxStateEnum.BEGINED and self._external_error is None:
# It's strictly recommended to close transactions directly
# by using commit_tx=True flag while executing statement or by
Expand Down
6 changes: 1 addition & 5 deletions ydb/query/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ class IQuerySessionState(abc.ABC):
def __init__(self, settings: Optional[QueryClientSettings] = None):
pass

@abc.abstractmethod
def reset(self) -> None:
pass

@property
@abc.abstractmethod
def session_id(self) -> Optional[str]:
Expand Down Expand Up @@ -210,7 +206,7 @@ def decorator(rpc_state, response_pb, session_state: IQuerySessionState, *args,
try:
return func(rpc_state, response_pb, session_state, *args, **kwargs)
except issues.BadSession:
session_state.reset()
session_state.set_attached(False)
raise

return decorator
Expand Down
13 changes: 4 additions & 9 deletions ydb/query/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ class QuerySessionState(base.IQuerySessionState):
def __init__(self, settings: base.QueryClientSettings = None):
self._settings = settings

def reset(self) -> None:
self._session_id = None
self._node_id = None
self._attached = False

@property
def session_id(self) -> Optional[str]:
return self._session_id
Expand Down Expand Up @@ -129,7 +124,7 @@ def wrapper_delete_session(
) -> "BaseQuerySession":
message = _ydb_query.DeleteSessionResponse.from_proto(response_pb)
issues._process_response(message.status)
session_state.reset()
session_state.set_attached(False)
session_state._change_state(QuerySessionStateEnum.CLOSED)
return session

Expand Down Expand Up @@ -257,7 +252,7 @@ def _attach(self, first_resp_timeout: int = DEFAULT_INITIAL_RESPONSE_TIMEOUT) ->
if first_response.status != issues.StatusCode.SUCCESS:
raise RuntimeError("Failed to attach session")
except Exception as e:
self._state.reset()
self._state.set_attached(False)
status_stream.cancel()
raise e

Expand All @@ -275,11 +270,11 @@ def _check_session_status_loop(self, status_stream: _utilities.SyncResponseItera
try:
for status in status_stream:
if status.status != issues.StatusCode.SUCCESS:
self._state.reset()
self._state.set_attached(False)
self._state._change_state(QuerySessionStateEnum.CLOSED)
except Exception:
if not self._state._already_in(QuerySessionStateEnum.CLOSED):
self._state.reset()
self._state.set_attached(False)
self._state._change_state(QuerySessionStateEnum.CLOSED)

def delete(self, settings: Optional[BaseRequestSettings] = None) -> None:
Expand Down
4 changes: 4 additions & 0 deletions ydb/query/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ def __exit__(self, *args, **kwargs):
it is not finished explicitly
"""
self._ensure_prev_stream_finished()

if not self._session_state.attached:
return

if self._tx_state._state == QueryTxStateEnum.BEGINED and self._external_error is None:
# It's strictly recommended to close transactions directly
# by using commit_tx=True flag while executing statement or by
Expand Down
Loading