diff --git a/CHANGELOG.md b/CHANGELOG.md index d154fa39..cc889480 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Add to public topic reader api: TopicReaderBatch, wait_message + ## 3.3.7 ## * Added copy of locals() dicts at internals diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index 46bc0faa..2893aa55 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -22,6 +22,7 @@ async def test_link_to_client(self, driver, topic_path, topic_consumer): async def test_read_message(self, driver, topic_with_messages, topic_consumer): reader = driver.topic_client.reader(topic_with_messages, topic_consumer) + await reader.wait_message() msg = await reader.receive_message() assert msg is not None diff --git a/ydb/_topic_reader/datatypes.py b/ydb/_topic_reader/datatypes.py index 1b767e7c..28155ea7 100644 --- a/ydb/_topic_reader/datatypes.py +++ b/ydb/_topic_reader/datatypes.py @@ -71,20 +71,14 @@ class PartitionSession: _ack_waiters: Deque["PartitionSession.CommitAckWaiter"] = field(init=False, default_factory=lambda: deque()) _state_changed: asyncio.Event = field(init=False, default_factory=lambda: asyncio.Event(), compare=False) - _loop: Optional[asyncio.AbstractEventLoop] = field(init=False) # may be None in tests def __post_init__(self): self._next_message_start_commit_offset = self.committed_offset - try: - self._loop = asyncio.get_running_loop() - except RuntimeError: - self._loop = None - def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter": self._ensure_not_closed() - waiter = PartitionSession.CommitAckWaiter(end_offset, self._create_future()) + waiter = PartitionSession.CommitAckWaiter(end_offset, asyncio.Future()) if end_offset <= self.committed_offset: waiter._finish_ok() return waiter @@ -97,11 +91,6 @@ def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter": return waiter - def _create_future(self) -> asyncio.Future: - if self._loop: - return self._loop.create_future() - return asyncio.Future() - def ack_notify(self, offset: int): self._ensure_not_closed() diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index facd0853..50684f7c 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -88,6 +88,12 @@ def __del__(self): if not self._closed: self._loop.create_task(self.close(flush=False), name="close reader") + async def wait_message(self): + """ + Wait at least one message from reader. + """ + await self._reconnector.wait_message() + async def receive_batch( self, ) -> typing.Union[datatypes.PublicBatch, None]: diff --git a/ydb/_topic_reader/topic_reader_sync.py b/ydb/_topic_reader/topic_reader_sync.py index 5c8db630..e5b4e1a2 100644 --- a/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/_topic_reader/topic_reader_sync.py @@ -84,7 +84,7 @@ def async_wait_message(self) -> concurrent.futures.Future: """ self._check_closed() - return self._caller.unsafe_call_with_future(self._async_reader._reconnector.wait_message()) + return self._caller.unsafe_call_with_future(self._async_reader.wait_message()) def receive_batch( self, diff --git a/ydb/topic.py b/ydb/topic.py index e0d3cf2d..00ffb1c4 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -11,6 +11,7 @@ "TopicMeteringMode", "TopicReader", "TopicReaderAsyncIO", + "TopicReaderBatch", "TopicReaderMessage", "TopicReaderSelector", "TopicReaderSettings", @@ -33,7 +34,10 @@ from . import driver -from ._topic_reader.datatypes import PublicMessage as TopicReaderMessage +from ._topic_reader.datatypes import ( + PublicBatch as TopicReaderBatch, + PublicMessage as TopicReaderMessage, +) from ._topic_reader.topic_reader import ( PublicReaderSettings as TopicReaderSettings,