-
Notifications
You must be signed in to change notification settings - Fork 62
Closed
Labels
Description
Bug Report
ydb == 3.18.14
Environment
Python == 3.12
Current behavior:
Asyncio.CancelledError is raised without a clear or consistent trigger, which interrupts the message writing process
Expected behavior:
The message should be written to the topic successfully, or the more specific exception should be raised
Steps to reproduce:
Attempt to send message using:
writer: ydb.topic.TopicWriterAsyncIO
messages: ydb.topic.TopicWriterMessage
await asyncio.wait_for(writer.write_with_ack(messages), timeout)
Related code:
sdk traceback is:
File "contrib/tools/python3/Lib/asyncio/tasks.py", line 520, in wait_for
return await fut
^^^^^^^^^
File "contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 102, in write_with_ack
futures = await self.write_with_ack_future(messages)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 131, in write_with_ack_future
futures = await self._reconnector.write_with_ack_future(converted_messages)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 286, in write_with_ack_future
self._check_stop()
File "contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 345, in _check_stop
raise self._stop_reason.exception()
File "contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 387, in _connection_loop
done.pop().result() # need for raise exception - reason of stop task
^^^^^^^^^^^^^^^^^^^
File "contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 517, in _read_loop
resp = await writer.receive()
^^^^^^^^^^^^^^^^^^^^^^
File "contrib/python/ydb/py3/ydb/_topic_writer/topic_writer_asyncio.py", line 643, in receive
item = await self._stream.receive()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "contrib/python/ydb/py3/ydb/_grpc/grpcwrapper/common_utils.py", line 205, in receive
grpc_message = await self.from_server_grpc.__anext__()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "contrib/python/grpcio/py3/grpc/aio/_call.py", line 326, in _fetch_stream_responses
await self._raise_for_status()
File "contrib/python/grpcio/py3/grpc/aio/_call.py", line 233, in _raise_for_status
raise asyncio.CancelledError()
asyncio.exceptions.CancelledError",
Other information:
This occurs inconsistently (actually quite rarely) and we can't determine the root cause and even reproduce it due to unpredictable nature.
Metadata
Metadata
Assignees
Labels
Type
Projects
Milestone
Relationships
Development
Select code repository
Activity