@@ -39,11 +39,13 @@ class BaseConnection:
39
39
_tx_mode : ydb .BaseQueryTxMode = ydb .QuerySerializableReadWrite ()
40
40
_tx_context : ydb .QueryTxContext | ydb .aio .QueryTxContext | None = None
41
41
interactive_transaction : bool = False
42
-
43
42
_shared_session_pool : bool = False
43
+
44
44
_driver_cls = ydb .Driver
45
- _driver : ydb .Driver | ydb .aio .Driver
46
45
_pool_cls = ydb .QuerySessionPool
46
+ _cursor_cls : type [Cursor | AsyncCursor ] = Cursor
47
+
48
+ _driver : ydb .Driver | ydb .aio .Driver
47
49
_pool : ydb .QuerySessionPool | ydb .aio .QuerySessionPool
48
50
49
51
_current_cursor : AsyncCursor | Cursor | None = None
@@ -130,10 +132,31 @@ def get_isolation_level(self) -> str:
130
132
msg = f"{ self ._tx_mode .name } is not supported"
131
133
raise NotSupportedError (msg )
132
134
135
+ def cursor (self ) -> Cursor | AsyncCursor :
136
+ if self ._session is None :
137
+ raise RuntimeError ("Connection is not ready, use wait_ready." )
138
+ if self ._current_cursor and not self ._current_cursor .is_closed :
139
+ raise RuntimeError (
140
+ "Unable to create new Cursor before closing existing one."
141
+ )
142
+
143
+ if self .interactive_transaction :
144
+ self ._tx_context = self ._session .transaction (self ._tx_mode )
145
+ else :
146
+ self ._tx_context = None
147
+
148
+ self ._current_cursor = self ._cursor_cls (
149
+ session = self ._session ,
150
+ tx_context = self ._tx_context ,
151
+ autocommit = (not self .interactive_transaction ),
152
+ )
153
+ return self ._current_cursor
154
+
133
155
134
156
class Connection (BaseConnection ):
135
157
_driver_cls = ydb .Driver
136
158
_pool_cls = ydb .QuerySessionPool
159
+ _cursor_cls = Cursor
137
160
138
161
_driver : ydb .Driver
139
162
_pool : ydb .QuerySessionPool
@@ -154,26 +177,6 @@ def wait_ready(self, timeout: int = 10) -> None:
154
177
155
178
self ._session = self ._session_pool .acquire ()
156
179
157
- def cursor (self ) -> Cursor :
158
- if self ._session is None :
159
- raise RuntimeError ("Connection is not ready, use wait_ready." )
160
- if self ._current_cursor and not self ._current_cursor .is_closed :
161
- raise RuntimeError (
162
- "Unable to create new Cursor before closing existing one."
163
- )
164
-
165
- if self .interactive_transaction :
166
- self ._tx_context = self ._session .transaction (self ._tx_mode )
167
- else :
168
- self ._tx_context = None
169
-
170
- self ._current_cursor = Cursor (
171
- session = self ._session ,
172
- tx_context = self ._tx_context ,
173
- autocommit = (not self .interactive_transaction ),
174
- )
175
- return self ._current_cursor
176
-
177
180
def commit (self ) -> None :
178
181
if self ._tx_context and self ._tx_context .tx_id :
179
182
self ._tx_context .commit ()
@@ -247,6 +250,7 @@ def callee() -> ydb.Directory:
247
250
class AsyncConnection (BaseConnection ):
248
251
_driver_cls = ydb .aio .Driver
249
252
_pool_cls = ydb .aio .QuerySessionPool
253
+ _cursor_cls = AsyncCursor
250
254
251
255
_driver : ydb .aio .Driver
252
256
_pool : ydb .aio .QuerySessionPool
@@ -267,26 +271,6 @@ async def wait_ready(self, timeout: int = 10) -> None:
267
271
268
272
self ._session = await self ._session_pool .acquire ()
269
273
270
- def cursor (self ) -> AsyncCursor :
271
- if self ._session is None :
272
- raise RuntimeError ("Connection is not ready, use wait_ready." )
273
- if self ._current_cursor and not self ._current_cursor .is_closed :
274
- raise RuntimeError (
275
- "Unable to create new Cursor before closing existing one."
276
- )
277
-
278
- if self .interactive_transaction :
279
- self ._tx_context = self ._session .transaction (self ._tx_mode )
280
- else :
281
- self ._tx_context = None
282
-
283
- self ._current_cursor = AsyncCursor (
284
- session = self ._session ,
285
- tx_context = self ._tx_context ,
286
- autocommit = (not self .interactive_transaction ),
287
- )
288
- return self ._current_cursor
289
-
290
274
async def commit (self ) -> None :
291
275
if self ._tx_context and self ._tx_context .tx_id :
292
276
await self ._tx_context .commit ()
0 commit comments