Skip to content

bug: asyncio.Cancelled error from TopicReader #735

@victimsnino

Description

@victimsnino

Bug Report

YDB Python SDK version:

3.22.1

Environment

arc

Current behavior:

During usage of TopicWriterAsyncIO and pushing chunks of data we are facing exception like this

    await self.writer.wait_init()
  File "deps/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 192, in wait_init
    return await self._reconnector.wait_init()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "deps/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 360, in wait_init
    raise self._stop_reason.exception()
  <CALL_STACK>
    await self.writer.write(message)
  File "deps/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 173, in write
    await self.write_with_ack_future(messages)
  File "deps/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 153, in write_with_ack_future
    futures = await self._reconnector.write_with_ack_future(converted_messages)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "deps/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 375, in write_with_ack_future
    self._check_stop()
  File "deps/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 434, in _check_stop
    raise self._stop_reason.exception()
  File "deps/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 448, in _connection_loop
    stream_writer = await WriterAsyncIOStream.create(
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "deps/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 780, in create
    await writer._start(stream, init_request)
  File "deps/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 806, in _start
    resp = await stream.receive(timeout=DEFAULT_INITIAL_RESPONSE_TIMEOUT)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "deps/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py", line 232, in receive
    grpc_message = await asyncio.wait_for(get_response(), timeout)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "deps/tools/python3/Lib/asyncio/tasks.py", line 520, in wait_for
    return await fut
           ^^^^^^^^^
  File "deps/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py", line 230, in get_response
    return await self.from_server_grpc.__anext__()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "deps/python/ydb/py3/ydb/aio/connection.py", line 122, in __anext__
    return await self.it.__anext__()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "deps/python/grpcio/py3/grpc/aio/_interceptor.py", line 487, in _wait_for_interceptor_task_response_iterator
    async for response in call:
  File "deps/python/opentelemetry-instrumentation-grpc/opentelemetry/instrumentation/grpc/_aio_client.py", line 130, in _wrap_stream_response
    async for response in call:
  File "deps/python/grpcio/py3/grpc/aio/_call.py", line 356, in _fetch_stream_responses
    await self._raise_for_status()
  File "deps/python/grpcio/py3/grpc/aio/_call.py", line 260, in _raise_for_status
    raise asyncio.CancelledError()
asyncio.exceptions.CancelledError"

it throws in write and later on wait_init() of same instance.
Time of this exception matches YDB hosts downtime/reconnection.

Re-creation of writer from scratch in case of obtaining exception could help

Expected behavior:

SDK handles connection issues itself

Steps to reproduce:

Related code:

insert short code snippets here

Other information:

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions