From 0d48427c5412ee68b049bc53bd3df66f7b9843a5 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Sat, 18 Mar 2023 15:43:29 +0300 Subject: [PATCH 1/5] init read message --- ydb/_topic_reader/datatypes.py | 11 + ydb/_topic_reader/topic_reader_asyncio.py | 35 +- .../topic_reader_asyncio_test.py | 461 +++++++++++------- ydb/_topic_reader/topic_reader_sync.py | 31 +- 4 files changed, 358 insertions(+), 180 deletions(-) diff --git a/ydb/_topic_reader/datatypes.py b/ydb/_topic_reader/datatypes.py index 3845995f..860525ab 100644 --- a/ydb/_topic_reader/datatypes.py +++ b/ydb/_topic_reader/datatypes.py @@ -179,6 +179,9 @@ def _commit_get_offsets_range(self) -> OffsetsRange: self.messages[-1]._commit_get_offsets_range().end, ) + def is_empty(self) -> bool: + return len(self.messages) == 0 + # ISessionAlive implementation @property def is_alive(self) -> bool: @@ -187,3 +190,11 @@ def is_alive(self) -> bool: state == PartitionSession.State.Active or state == PartitionSession.State.GracefulShutdown ) + + def pop_message(self) -> PublicMessage: + if len(self.messages) == 0: + raise IndexError() + + res = self.messages[0] + self.messages = self.messages[1:] + return res diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index bb87d3cc..c74f7d09 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -95,14 +95,6 @@ def messages( """ raise NotImplementedError() - async def receive_message(self) -> typing.Union[topic_reader.PublicMessage, None]: - """ - Block until receive new message - - use asyncio.wait_for for wait with timeout. - """ - raise NotImplementedError() - def batches( self, *, @@ -133,6 +125,15 @@ async def receive_batch( await self._reconnector.wait_message() return self._reconnector.receive_batch_nowait() + async def receive_message(self) -> typing.Union[datatypes.PublicMessage, None]: + """ + Block until receive new message + + use asyncio.wait_for for wait with timeout. + """ + await self._reconnector.wait_message() + return self._reconnector.receive_message_nowait() + async def commit_on_exit( self, mess: datatypes.ICommittable ) -> typing.AsyncContextManager: @@ -244,6 +245,9 @@ async def wait_message(self): def receive_batch_nowait(self): return self._stream_reader.receive_batch_nowait() + def receive_message_nowait(self): + return self._stream_reader.receive_message_nowait() + def commit( self, batch: datatypes.ICommittable ) -> datatypes.PartitionSession.CommitAckWaiter: @@ -397,12 +401,25 @@ def receive_batch_nowait(self): raise self._get_first_error() if not self._message_batches: - return + return None batch = self._message_batches.popleft() self._buffer_release_bytes(batch._bytes_size) return batch + def receive_message_nowait(self): + try: + batch = self._message_batches[0] + message = batch.pop_message() + except IndexError: + return None + + if batch.is_empty(): + self._message_batches.popleft() + + return message + + def commit( self, batch: datatypes.ICommittable ) -> datatypes.PartitionSession.CommitAckWaiter: diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index 2924cb4d..a153e25c 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -4,6 +4,7 @@ import datetime import gzip import typing +from collections import deque from dataclasses import dataclass from unittest import mock @@ -53,6 +54,34 @@ def default_executor(): executor.shutdown() +def stub_partition_session(): + return datatypes.PartitionSession( + id=0, + state=datatypes.PartitionSession.State.Active, + topic_path="asd", + partition_id=1, + committed_offset=0, + reader_reconnector_id=415, + reader_stream_id=513, + ) + + +def stub_message(id: int): + return PublicMessage( + seqno=id, + created_at=datetime.datetime(2023, 3, 18, 14, 15), + message_group_id="", + session_metadata={}, + offset=0, + written_at=datetime.datetime(2023, 3, 18, 14, 15), + producer_id="", + data=bytes(), + _partition_session=stub_partition_session(), + _commit_start_offset=0, + _commit_end_offset=1, + ) + + @pytest.fixture() def default_reader_settings(default_executor): return PublicReaderSettings( @@ -85,7 +114,7 @@ def stream(self): @pytest.fixture() def partition_session( - self, default_reader_settings, stream_reader_started: ReaderStream + self, default_reader_settings, stream_reader_started: ReaderStream ) -> datatypes.PartitionSession: partition_session = datatypes.PartitionSession( id=2, @@ -106,7 +135,7 @@ def partition_session( @pytest.fixture() def second_partition_session( - self, default_reader_settings, stream_reader_started: ReaderStream + self, default_reader_settings, stream_reader_started: ReaderStream ): partition_session = datatypes.PartitionSession( id=12, @@ -157,7 +186,7 @@ async def get_started_reader(self, stream, *args, **kwargs) -> ReaderStream: @pytest.fixture() async def stream_reader_started( - self, stream, default_reader_settings + self, stream, default_reader_settings ) -> ReaderStream: return await self.get_started_reader(stream, default_reader_settings) @@ -170,7 +199,7 @@ async def stream_reader(self, stream_reader_started: ReaderStream): @pytest.fixture() async def stream_reader_finish_with_error( - self, stream_reader_started: ReaderStream + self, stream_reader_started: ReaderStream ): yield stream_reader_started @@ -179,7 +208,7 @@ async def stream_reader_finish_with_error( @staticmethod def create_message( - partition_session: datatypes.PartitionSession, seqno: int, offset_delta: int + partition_session: typing.Optional[datatypes.PartitionSession], seqno: int, offset_delta: int ): return PublicMessage( seqno=seqno, @@ -187,17 +216,17 @@ def create_message( message_group_id="test-message-group", session_metadata={}, offset=partition_session._next_message_start_commit_offset - + offset_delta - - 1, + + offset_delta + - 1, written_at=datetime.datetime(2023, 2, 3, 14, 16), producer_id="test-producer-id", data=bytes(), _partition_session=partition_session, _commit_start_offset=partition_session._next_message_start_commit_offset - + offset_delta - - 1, + + offset_delta + - 1, _commit_end_offset=partition_session._next_message_start_commit_offset - + offset_delta, + + offset_delta, ) async def send_message(self, stream_reader, message: PublicMessage): @@ -257,28 +286,28 @@ class TestError(Exception): "commit,send_range", [ ( - OffsetsRange( - partition_session_committed_offset, - partition_session_committed_offset + 1, - ), - True, + OffsetsRange( + partition_session_committed_offset, + partition_session_committed_offset + 1, + ), + True, ), ( - OffsetsRange( - partition_session_committed_offset - 1, - partition_session_committed_offset, - ), - False, + OffsetsRange( + partition_session_committed_offset - 1, + partition_session_committed_offset, + ), + False, ), ], ) async def test_send_commit_messages( - self, - stream, - stream_reader: ReaderStream, - partition_session, - commit: OffsetsRange, - send_range: bool, + self, + stream, + stream_reader: ReaderStream, + partition_session, + commit: OffsetsRange, + send_range: bool, ): @dataclass class Commitable(datatypes.ICommittable): @@ -318,7 +347,7 @@ async def wait_message(): assert start_ack_waiters == partition_session._ack_waiters async def test_commit_ack_received( - self, stream_reader, stream, partition_session, second_partition_session + self, stream_reader, stream, partition_session, second_partition_session ): offset1 = self.partition_session_committed_offset + 1 waiter1 = partition_session.add_waiter(offset1) @@ -348,7 +377,7 @@ async def test_commit_ack_received( await wait_for_fast(waiter2.future) async def test_close_ack_waiters_when_close_stream_reader( - self, stream_reader_started: ReaderStream, partition_session + self, stream_reader_started: ReaderStream, partition_session ): waiter = partition_session.add_waiter( self.partition_session_committed_offset + 1 @@ -359,7 +388,7 @@ async def test_close_ack_waiters_when_close_stream_reader( waiter.future.result() async def test_commit_ranges_for_received_messages( - self, stream, stream_reader_started: ReaderStream, partition_session + self, stream, stream_reader_started: ReaderStream, partition_session ): m1 = self.create_message(partition_session, 1, 1) m2 = self.create_message(partition_session, 2, 10) @@ -381,131 +410,131 @@ async def test_commit_ranges_for_received_messages( "batch,data_out", [ ( - PublicBatch( - session_metadata={}, - messages=[ - PublicMessage( - seqno=1, - created_at=datetime.datetime(2023, 3, 14, 15, 41), - message_group_id="", - session_metadata={}, - offset=1, - written_at=datetime.datetime(2023, 3, 14, 15, 42), - producer_id="asd", - data=rb"123", - _partition_session=None, - _commit_start_offset=5, - _commit_end_offset=15, - ) - ], - _partition_session=None, - _bytes_size=0, - _codec=Codec.CODEC_RAW, - ), - [bytes(rb"123")], + PublicBatch( + session_metadata={}, + messages=[ + PublicMessage( + seqno=1, + created_at=datetime.datetime(2023, 3, 14, 15, 41), + message_group_id="", + session_metadata={}, + offset=1, + written_at=datetime.datetime(2023, 3, 14, 15, 42), + producer_id="asd", + data=rb"123", + _partition_session=None, + _commit_start_offset=5, + _commit_end_offset=15, + ) + ], + _partition_session=None, + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + [bytes(rb"123")], ), ( - PublicBatch( - session_metadata={}, - messages=[ - PublicMessage( - seqno=1, - created_at=datetime.datetime(2023, 3, 14, 15, 41), - message_group_id="", - session_metadata={}, - offset=1, - written_at=datetime.datetime(2023, 3, 14, 15, 42), - producer_id="asd", - data=gzip.compress(rb"123"), - _partition_session=None, - _commit_start_offset=5, - _commit_end_offset=15, - ) - ], - _partition_session=None, - _bytes_size=0, - _codec=Codec.CODEC_GZIP, - ), - [bytes(rb"123")], + PublicBatch( + session_metadata={}, + messages=[ + PublicMessage( + seqno=1, + created_at=datetime.datetime(2023, 3, 14, 15, 41), + message_group_id="", + session_metadata={}, + offset=1, + written_at=datetime.datetime(2023, 3, 14, 15, 42), + producer_id="asd", + data=gzip.compress(rb"123"), + _partition_session=None, + _commit_start_offset=5, + _commit_end_offset=15, + ) + ], + _partition_session=None, + _bytes_size=0, + _codec=Codec.CODEC_GZIP, + ), + [bytes(rb"123")], ), ( - PublicBatch( - session_metadata={}, - messages=[ - PublicMessage( - seqno=1, - created_at=datetime.datetime(2023, 3, 14, 15, 41), - message_group_id="", - session_metadata={}, - offset=1, - written_at=datetime.datetime(2023, 3, 14, 15, 42), - producer_id="asd", - data=rb"123", - _partition_session=None, - _commit_start_offset=5, - _commit_end_offset=15, - ), - PublicMessage( - seqno=1, - created_at=datetime.datetime(2023, 3, 14, 15, 41), - message_group_id="", - session_metadata={}, - offset=1, - written_at=datetime.datetime(2023, 3, 14, 15, 42), - producer_id="asd", - data=rb"456", - _partition_session=None, - _commit_start_offset=5, - _commit_end_offset=15, - ), - ], - _partition_session=None, - _bytes_size=0, - _codec=Codec.CODEC_RAW, - ), - [bytes(rb"123"), bytes(rb"456")], + PublicBatch( + session_metadata={}, + messages=[ + PublicMessage( + seqno=1, + created_at=datetime.datetime(2023, 3, 14, 15, 41), + message_group_id="", + session_metadata={}, + offset=1, + written_at=datetime.datetime(2023, 3, 14, 15, 42), + producer_id="asd", + data=rb"123", + _partition_session=None, + _commit_start_offset=5, + _commit_end_offset=15, + ), + PublicMessage( + seqno=1, + created_at=datetime.datetime(2023, 3, 14, 15, 41), + message_group_id="", + session_metadata={}, + offset=1, + written_at=datetime.datetime(2023, 3, 14, 15, 42), + producer_id="asd", + data=rb"456", + _partition_session=None, + _commit_start_offset=5, + _commit_end_offset=15, + ), + ], + _partition_session=None, + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + [bytes(rb"123"), bytes(rb"456")], ), ( - PublicBatch( - session_metadata={}, - messages=[ - PublicMessage( - seqno=1, - created_at=datetime.datetime(2023, 3, 14, 15, 41), - message_group_id="", - session_metadata={}, - offset=1, - written_at=datetime.datetime(2023, 3, 14, 15, 42), - producer_id="asd", - data=gzip.compress(rb"123"), - _partition_session=None, - _commit_start_offset=5, - _commit_end_offset=15, - ), - PublicMessage( - seqno=1, - created_at=datetime.datetime(2023, 3, 14, 15, 41), - message_group_id="", - session_metadata={}, - offset=1, - written_at=datetime.datetime(2023, 3, 14, 15, 42), - producer_id="asd", - data=gzip.compress(rb"456"), - _partition_session=None, - _commit_start_offset=5, - _commit_end_offset=15, - ), - ], - _partition_session=None, - _bytes_size=0, - _codec=Codec.CODEC_GZIP, - ), - [bytes(rb"123"), bytes(rb"456")], + PublicBatch( + session_metadata={}, + messages=[ + PublicMessage( + seqno=1, + created_at=datetime.datetime(2023, 3, 14, 15, 41), + message_group_id="", + session_metadata={}, + offset=1, + written_at=datetime.datetime(2023, 3, 14, 15, 42), + producer_id="asd", + data=gzip.compress(rb"123"), + _partition_session=None, + _commit_start_offset=5, + _commit_end_offset=15, + ), + PublicMessage( + seqno=1, + created_at=datetime.datetime(2023, 3, 14, 15, 41), + message_group_id="", + session_metadata={}, + offset=1, + written_at=datetime.datetime(2023, 3, 14, 15, 42), + producer_id="asd", + data=gzip.compress(rb"456"), + _partition_session=None, + _commit_start_offset=5, + _commit_end_offset=15, + ), + ], + _partition_session=None, + _bytes_size=0, + _codec=Codec.CODEC_GZIP, + ), + [bytes(rb"123"), bytes(rb"456")], ), ], ) async def test_decode_loop( - self, stream_reader, batch: PublicBatch, data_out: typing.List[bytes] + self, stream_reader, batch: PublicBatch, data_out: typing.List[bytes] ): assert len(batch.messages) == len(data_out) @@ -520,7 +549,7 @@ async def test_decode_loop( assert batch == expected async def test_error_from_status_code( - self, stream, stream_reader_finish_with_error + self, stream, stream_reader_finish_with_error ): # noinspection PyTypeChecker stream.from_server.put_nowait( @@ -580,11 +609,11 @@ async def test_init_reader(self, stream, default_reader_settings): await reader.close() async def test_start_partition( - self, - stream_reader: ReaderStream, - stream, - default_reader_settings, - partition_session, + self, + stream_reader: ReaderStream, + stream, + default_reader_settings, + partition_session, ): def session_count(): return len(stream_reader._partition_sessions) @@ -624,8 +653,8 @@ def session_count(): assert len(stream_reader._partition_sessions) == initial_session_count + 1 assert stream_reader._partition_sessions[ - test_partition_session_id - ] == datatypes.PartitionSession( + test_partition_session_id + ] == datatypes.PartitionSession( id=test_partition_session_id, state=datatypes.PartitionSession.State.Active, topic_path=test_topic_path, @@ -660,7 +689,7 @@ def session_count(): assert partition_session.id not in stream_reader._partition_sessions async def test_partition_stop_graceful( - self, stream, stream_reader, partition_session + self, stream, stream_reader, partition_session ): def session_count(): return len(stream_reader._partition_sessions) @@ -703,11 +732,11 @@ def session_count(): stream.from_client.get_nowait() async def test_receive_message_from_server( - self, - stream_reader, - stream, - partition_session: datatypes.PartitionSession, - second_partition_session, + self, + stream_reader, + stream, + partition_session: datatypes.PartitionSession, + second_partition_session, ): def reader_batch_count(): return len(stream_reader._message_batches) @@ -785,7 +814,7 @@ def reader_batch_count(): ) async def test_read_batches( - self, stream_reader, partition_session, second_partition_session + self, stream_reader, partition_session, second_partition_session ): created_at = datetime.datetime(2020, 2, 1, 18, 12) created_at2 = datetime.datetime(2020, 2, 2, 18, 12) @@ -963,6 +992,102 @@ async def test_read_batches( _codec=Codec.CODEC_RAW, ) + @pytest.mark.parametrize( + 'batches_before,expected_message,batches_after', + [ + ( + [], + None, + [] + ), + ( + [PublicBatch( + session_metadata={}, + messages=[stub_message(1)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + )], + stub_message(1), + [] + ), + ( + [ + PublicBatch( + session_metadata={}, + messages=[stub_message(1), stub_message(2)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + PublicBatch( + session_metadata={}, + messages=[stub_message(3), stub_message(4)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ) + ], + stub_message(1), + [ + PublicBatch( + session_metadata={}, + messages=[stub_message(2)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + PublicBatch( + session_metadata={}, + messages=[stub_message(3), stub_message(4)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ) + ], + ), + ( + [ + PublicBatch( + session_metadata={}, + messages=[stub_message(1)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + PublicBatch( + session_metadata={}, + messages=[stub_message(2), stub_message(3)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + ], + stub_message(1), + [PublicBatch( + session_metadata={}, + messages=[stub_message(2), stub_message(3)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + )], + ), + + ] + ) + async def test_read_message( + self, + stream_reader, + batches_before: typing.List[datatypes.PublicBatch], + expected_message: PublicMessage, + batches_after: typing.List[datatypes.PublicBatch], + ): + stream_reader._message_batches = deque(batches_before) + mess = stream_reader.receive_message_nowait() + + assert mess == expected_message + assert list(stream_reader._message_batches) == batches_after + async def test_receive_batch_nowait(self, stream, stream_reader, partition_session): assert stream_reader.receive_batch_nowait() is None @@ -993,17 +1118,17 @@ async def test_receive_batch_nowait(self, stream, stream_reader, partition_sessi ) assert ( - stream_reader._buffer_size_bytes - == initial_buffer_size + 2 * self.default_batch_size + stream_reader._buffer_size_bytes + == initial_buffer_size + 2 * self.default_batch_size ) assert ( - StreamReadMessage.ReadRequest(self.default_batch_size) - == stream.from_client.get_nowait().client_message + StreamReadMessage.ReadRequest(self.default_batch_size) + == stream.from_client.get_nowait().client_message ) assert ( - StreamReadMessage.ReadRequest(self.default_batch_size) - == stream.from_client.get_nowait().client_message + StreamReadMessage.ReadRequest(self.default_batch_size) + == stream.from_client.get_nowait().client_message ) with pytest.raises(asyncio.QueueEmpty): @@ -1068,9 +1193,9 @@ async def wait_messages(): stream_index = 0 async def stream_create( - reader_reconnector_id: int, - driver: SupportedDriverType, - settings: PublicReaderSettings, + reader_reconnector_id: int, + driver: SupportedDriverType, + settings: PublicReaderSettings, ): nonlocal stream_index stream_index += 1 diff --git a/ydb/_topic_reader/topic_reader_sync.py b/ydb/_topic_reader/topic_reader_sync.py index ec243337..17706c79 100644 --- a/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/_topic_reader/topic_reader_sync.py @@ -5,6 +5,7 @@ from ydb._grpc.grpcwrapper.common_utils import SupportedDriverType from ydb._topic_common.common import _get_shared_event_loop +from ydb._topic_reader import datatypes from ydb._topic_reader.datatypes import PublicMessage, PublicBatch, ICommittable from ydb._topic_reader.topic_reader import ( PublicReaderSettings, @@ -72,6 +73,19 @@ def _call_sync(self, coro: Coroutine, timeout): f.cancel() raise + def _call_nowait(self, callback: typing.Callable[[], typing.Any]) -> typing.Any: + res = concurrent.futures.Future() + + def call(): + try: + res.set_result(call()) + except BaseException as err: + res.set_exception(err) + + self._loop.call_soon_threadsafe(call) + + return res.result() + def async_sessions_stat(self) -> concurrent.futures.Future: """ Receive stat from the server, return feature. @@ -100,15 +114,23 @@ def messages( """ raise NotImplementedError() - def receive_message(self, *, timeout: Union[float, None] = None) -> PublicMessage: + def receive_message(self, *, timeout: Union[float, None] = None) -> datatypes.PublicMessage: """ Block until receive new message It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. + receive_message(timeout=0) may return None even right after async_wait_message() is ok - because lost of partition + or connection to server lost if no new message in timeout seconds (default - infinite): raise TimeoutError() if timeout <= 0 - it will fast non block method, get messages from internal buffer only. """ - raise NotImplementedError() + if timeout <= 0: + return self._receive_message_nowait() + + return self._call_sync(self._async_reader.receive_message(), timeout) + + def _receive_message_nowait(self) -> Optional[datatypes.PublicMessage]: + return self._call_nowait(lambda: self._async_reader._reconnector.receive_message_nowait()) def async_wait_message(self) -> concurrent.futures.Future: """ @@ -118,7 +140,7 @@ def async_wait_message(self) -> concurrent.futures.Future: Possible situation when receive signal about message available, but no messages when try to receive a message. If message expired between send event and try to retrieve message (for example connection broken). """ - raise NotImplementedError() + return self._call(self._async_reader._reconnector.wait_message()) def batches( self, @@ -157,6 +179,9 @@ def receive_batch( timeout, ) + def _receive_batch_nowait(self) -> Optional[PublicBatch]: + return self._call_nowait(lambda: self._async_reader._reconnector.receive_batch_nowait()) + def commit(self, mess: ICommittable): """ Put commit message to internal buffer. From 60632774b4b98a5db3cd96e1a85b266e7e3d2606 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Mon, 20 Mar 2023 13:25:41 +0300 Subject: [PATCH 2/5] merge --- ydb/_topic_reader/topic_reader_asyncio.py | 1 - .../topic_reader_asyncio_test.py | 475 +++++++++--------- ydb/_topic_reader/topic_reader_sync.py | 13 +- 3 files changed, 247 insertions(+), 242 deletions(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index c74f7d09..7266ae43 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -419,7 +419,6 @@ def receive_message_nowait(self): return message - def commit( self, batch: datatypes.ICommittable ) -> datatypes.PartitionSession.CommitAckWaiter: diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index a153e25c..a310298e 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -114,7 +114,7 @@ def stream(self): @pytest.fixture() def partition_session( - self, default_reader_settings, stream_reader_started: ReaderStream + self, default_reader_settings, stream_reader_started: ReaderStream ) -> datatypes.PartitionSession: partition_session = datatypes.PartitionSession( id=2, @@ -135,7 +135,7 @@ def partition_session( @pytest.fixture() def second_partition_session( - self, default_reader_settings, stream_reader_started: ReaderStream + self, default_reader_settings, stream_reader_started: ReaderStream ): partition_session = datatypes.PartitionSession( id=12, @@ -186,7 +186,7 @@ async def get_started_reader(self, stream, *args, **kwargs) -> ReaderStream: @pytest.fixture() async def stream_reader_started( - self, stream, default_reader_settings + self, stream, default_reader_settings ) -> ReaderStream: return await self.get_started_reader(stream, default_reader_settings) @@ -199,7 +199,7 @@ async def stream_reader(self, stream_reader_started: ReaderStream): @pytest.fixture() async def stream_reader_finish_with_error( - self, stream_reader_started: ReaderStream + self, stream_reader_started: ReaderStream ): yield stream_reader_started @@ -208,7 +208,9 @@ async def stream_reader_finish_with_error( @staticmethod def create_message( - partition_session: typing.Optional[datatypes.PartitionSession], seqno: int, offset_delta: int + partition_session: typing.Optional[datatypes.PartitionSession], + seqno: int, + offset_delta: int, ): return PublicMessage( seqno=seqno, @@ -216,17 +218,17 @@ def create_message( message_group_id="test-message-group", session_metadata={}, offset=partition_session._next_message_start_commit_offset - + offset_delta - - 1, + + offset_delta + - 1, written_at=datetime.datetime(2023, 2, 3, 14, 16), producer_id="test-producer-id", data=bytes(), _partition_session=partition_session, _commit_start_offset=partition_session._next_message_start_commit_offset - + offset_delta - - 1, + + offset_delta + - 1, _commit_end_offset=partition_session._next_message_start_commit_offset - + offset_delta, + + offset_delta, ) async def send_message(self, stream_reader, message: PublicMessage): @@ -286,28 +288,28 @@ class TestError(Exception): "commit,send_range", [ ( - OffsetsRange( - partition_session_committed_offset, - partition_session_committed_offset + 1, - ), - True, + OffsetsRange( + partition_session_committed_offset, + partition_session_committed_offset + 1, + ), + True, ), ( - OffsetsRange( - partition_session_committed_offset - 1, - partition_session_committed_offset, - ), - False, + OffsetsRange( + partition_session_committed_offset - 1, + partition_session_committed_offset, + ), + False, ), ], ) async def test_send_commit_messages( - self, - stream, - stream_reader: ReaderStream, - partition_session, - commit: OffsetsRange, - send_range: bool, + self, + stream, + stream_reader: ReaderStream, + partition_session, + commit: OffsetsRange, + send_range: bool, ): @dataclass class Commitable(datatypes.ICommittable): @@ -347,7 +349,7 @@ async def wait_message(): assert start_ack_waiters == partition_session._ack_waiters async def test_commit_ack_received( - self, stream_reader, stream, partition_session, second_partition_session + self, stream_reader, stream, partition_session, second_partition_session ): offset1 = self.partition_session_committed_offset + 1 waiter1 = partition_session.add_waiter(offset1) @@ -377,7 +379,7 @@ async def test_commit_ack_received( await wait_for_fast(waiter2.future) async def test_close_ack_waiters_when_close_stream_reader( - self, stream_reader_started: ReaderStream, partition_session + self, stream_reader_started: ReaderStream, partition_session ): waiter = partition_session.add_waiter( self.partition_session_committed_offset + 1 @@ -388,7 +390,7 @@ async def test_close_ack_waiters_when_close_stream_reader( waiter.future.result() async def test_commit_ranges_for_received_messages( - self, stream, stream_reader_started: ReaderStream, partition_session + self, stream, stream_reader_started: ReaderStream, partition_session ): m1 = self.create_message(partition_session, 1, 1) m2 = self.create_message(partition_session, 2, 10) @@ -410,131 +412,131 @@ async def test_commit_ranges_for_received_messages( "batch,data_out", [ ( - PublicBatch( - session_metadata={}, - messages=[ - PublicMessage( - seqno=1, - created_at=datetime.datetime(2023, 3, 14, 15, 41), - message_group_id="", - session_metadata={}, - offset=1, - written_at=datetime.datetime(2023, 3, 14, 15, 42), - producer_id="asd", - data=rb"123", - _partition_session=None, - _commit_start_offset=5, - _commit_end_offset=15, - ) - ], - _partition_session=None, - _bytes_size=0, - _codec=Codec.CODEC_RAW, - ), - [bytes(rb"123")], + PublicBatch( + session_metadata={}, + messages=[ + PublicMessage( + seqno=1, + created_at=datetime.datetime(2023, 3, 14, 15, 41), + message_group_id="", + session_metadata={}, + offset=1, + written_at=datetime.datetime(2023, 3, 14, 15, 42), + producer_id="asd", + data=rb"123", + _partition_session=None, + _commit_start_offset=5, + _commit_end_offset=15, + ) + ], + _partition_session=None, + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + [bytes(rb"123")], ), ( - PublicBatch( - session_metadata={}, - messages=[ - PublicMessage( - seqno=1, - created_at=datetime.datetime(2023, 3, 14, 15, 41), - message_group_id="", - session_metadata={}, - offset=1, - written_at=datetime.datetime(2023, 3, 14, 15, 42), - producer_id="asd", - data=gzip.compress(rb"123"), - _partition_session=None, - _commit_start_offset=5, - _commit_end_offset=15, - ) - ], - _partition_session=None, - _bytes_size=0, - _codec=Codec.CODEC_GZIP, - ), - [bytes(rb"123")], + PublicBatch( + session_metadata={}, + messages=[ + PublicMessage( + seqno=1, + created_at=datetime.datetime(2023, 3, 14, 15, 41), + message_group_id="", + session_metadata={}, + offset=1, + written_at=datetime.datetime(2023, 3, 14, 15, 42), + producer_id="asd", + data=gzip.compress(rb"123"), + _partition_session=None, + _commit_start_offset=5, + _commit_end_offset=15, + ) + ], + _partition_session=None, + _bytes_size=0, + _codec=Codec.CODEC_GZIP, + ), + [bytes(rb"123")], ), ( - PublicBatch( - session_metadata={}, - messages=[ - PublicMessage( - seqno=1, - created_at=datetime.datetime(2023, 3, 14, 15, 41), - message_group_id="", - session_metadata={}, - offset=1, - written_at=datetime.datetime(2023, 3, 14, 15, 42), - producer_id="asd", - data=rb"123", - _partition_session=None, - _commit_start_offset=5, - _commit_end_offset=15, - ), - PublicMessage( - seqno=1, - created_at=datetime.datetime(2023, 3, 14, 15, 41), - message_group_id="", - session_metadata={}, - offset=1, - written_at=datetime.datetime(2023, 3, 14, 15, 42), - producer_id="asd", - data=rb"456", - _partition_session=None, - _commit_start_offset=5, - _commit_end_offset=15, - ), - ], - _partition_session=None, - _bytes_size=0, - _codec=Codec.CODEC_RAW, - ), - [bytes(rb"123"), bytes(rb"456")], + PublicBatch( + session_metadata={}, + messages=[ + PublicMessage( + seqno=1, + created_at=datetime.datetime(2023, 3, 14, 15, 41), + message_group_id="", + session_metadata={}, + offset=1, + written_at=datetime.datetime(2023, 3, 14, 15, 42), + producer_id="asd", + data=rb"123", + _partition_session=None, + _commit_start_offset=5, + _commit_end_offset=15, + ), + PublicMessage( + seqno=1, + created_at=datetime.datetime(2023, 3, 14, 15, 41), + message_group_id="", + session_metadata={}, + offset=1, + written_at=datetime.datetime(2023, 3, 14, 15, 42), + producer_id="asd", + data=rb"456", + _partition_session=None, + _commit_start_offset=5, + _commit_end_offset=15, + ), + ], + _partition_session=None, + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + [bytes(rb"123"), bytes(rb"456")], ), ( - PublicBatch( - session_metadata={}, - messages=[ - PublicMessage( - seqno=1, - created_at=datetime.datetime(2023, 3, 14, 15, 41), - message_group_id="", - session_metadata={}, - offset=1, - written_at=datetime.datetime(2023, 3, 14, 15, 42), - producer_id="asd", - data=gzip.compress(rb"123"), - _partition_session=None, - _commit_start_offset=5, - _commit_end_offset=15, - ), - PublicMessage( - seqno=1, - created_at=datetime.datetime(2023, 3, 14, 15, 41), - message_group_id="", - session_metadata={}, - offset=1, - written_at=datetime.datetime(2023, 3, 14, 15, 42), - producer_id="asd", - data=gzip.compress(rb"456"), - _partition_session=None, - _commit_start_offset=5, - _commit_end_offset=15, - ), - ], - _partition_session=None, - _bytes_size=0, - _codec=Codec.CODEC_GZIP, - ), - [bytes(rb"123"), bytes(rb"456")], + PublicBatch( + session_metadata={}, + messages=[ + PublicMessage( + seqno=1, + created_at=datetime.datetime(2023, 3, 14, 15, 41), + message_group_id="", + session_metadata={}, + offset=1, + written_at=datetime.datetime(2023, 3, 14, 15, 42), + producer_id="asd", + data=gzip.compress(rb"123"), + _partition_session=None, + _commit_start_offset=5, + _commit_end_offset=15, + ), + PublicMessage( + seqno=1, + created_at=datetime.datetime(2023, 3, 14, 15, 41), + message_group_id="", + session_metadata={}, + offset=1, + written_at=datetime.datetime(2023, 3, 14, 15, 42), + producer_id="asd", + data=gzip.compress(rb"456"), + _partition_session=None, + _commit_start_offset=5, + _commit_end_offset=15, + ), + ], + _partition_session=None, + _bytes_size=0, + _codec=Codec.CODEC_GZIP, + ), + [bytes(rb"123"), bytes(rb"456")], ), ], ) async def test_decode_loop( - self, stream_reader, batch: PublicBatch, data_out: typing.List[bytes] + self, stream_reader, batch: PublicBatch, data_out: typing.List[bytes] ): assert len(batch.messages) == len(data_out) @@ -549,7 +551,7 @@ async def test_decode_loop( assert batch == expected async def test_error_from_status_code( - self, stream, stream_reader_finish_with_error + self, stream, stream_reader_finish_with_error ): # noinspection PyTypeChecker stream.from_server.put_nowait( @@ -609,11 +611,11 @@ async def test_init_reader(self, stream, default_reader_settings): await reader.close() async def test_start_partition( - self, - stream_reader: ReaderStream, - stream, - default_reader_settings, - partition_session, + self, + stream_reader: ReaderStream, + stream, + default_reader_settings, + partition_session, ): def session_count(): return len(stream_reader._partition_sessions) @@ -653,8 +655,8 @@ def session_count(): assert len(stream_reader._partition_sessions) == initial_session_count + 1 assert stream_reader._partition_sessions[ - test_partition_session_id - ] == datatypes.PartitionSession( + test_partition_session_id + ] == datatypes.PartitionSession( id=test_partition_session_id, state=datatypes.PartitionSession.State.Active, topic_path=test_topic_path, @@ -689,7 +691,7 @@ def session_count(): assert partition_session.id not in stream_reader._partition_sessions async def test_partition_stop_graceful( - self, stream, stream_reader, partition_session + self, stream, stream_reader, partition_session ): def session_count(): return len(stream_reader._partition_sessions) @@ -732,11 +734,11 @@ def session_count(): stream.from_client.get_nowait() async def test_receive_message_from_server( - self, - stream_reader, - stream, - partition_session: datatypes.PartitionSession, - second_partition_session, + self, + stream_reader, + stream, + partition_session: datatypes.PartitionSession, + second_partition_session, ): def reader_batch_count(): return len(stream_reader._message_batches) @@ -814,7 +816,7 @@ def reader_batch_count(): ) async def test_read_batches( - self, stream_reader, partition_session, second_partition_session + self, stream_reader, partition_session, second_partition_session ): created_at = datetime.datetime(2020, 2, 1, 18, 12) created_at2 = datetime.datetime(2020, 2, 2, 18, 12) @@ -993,94 +995,93 @@ async def test_read_batches( ) @pytest.mark.parametrize( - 'batches_before,expected_message,batches_after', + "batches_before,expected_message,batches_after", [ + ([], None, []), ( - [], - None, - [] - ), - ( - [PublicBatch( + [ + PublicBatch( session_metadata={}, messages=[stub_message(1)], _partition_session=stub_partition_session(), _bytes_size=0, _codec=Codec.CODEC_RAW, - )], - stub_message(1), - [] + ) + ], + stub_message(1), + [], ), ( - [ - PublicBatch( - session_metadata={}, - messages=[stub_message(1), stub_message(2)], - _partition_session=stub_partition_session(), - _bytes_size=0, - _codec=Codec.CODEC_RAW, - ), - PublicBatch( - session_metadata={}, - messages=[stub_message(3), stub_message(4)], - _partition_session=stub_partition_session(), - _bytes_size=0, - _codec=Codec.CODEC_RAW, - ) - ], - stub_message(1), - [ - PublicBatch( - session_metadata={}, - messages=[stub_message(2)], - _partition_session=stub_partition_session(), - _bytes_size=0, - _codec=Codec.CODEC_RAW, - ), - PublicBatch( - session_metadata={}, - messages=[stub_message(3), stub_message(4)], - _partition_session=stub_partition_session(), - _bytes_size=0, - _codec=Codec.CODEC_RAW, - ) - ], + [ + PublicBatch( + session_metadata={}, + messages=[stub_message(1), stub_message(2)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + PublicBatch( + session_metadata={}, + messages=[stub_message(3), stub_message(4)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + ], + stub_message(1), + [ + PublicBatch( + session_metadata={}, + messages=[stub_message(2)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + PublicBatch( + session_metadata={}, + messages=[stub_message(3), stub_message(4)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + ], ), ( - [ - PublicBatch( - session_metadata={}, - messages=[stub_message(1)], - _partition_session=stub_partition_session(), - _bytes_size=0, - _codec=Codec.CODEC_RAW, - ), - PublicBatch( - session_metadata={}, - messages=[stub_message(2), stub_message(3)], - _partition_session=stub_partition_session(), - _bytes_size=0, - _codec=Codec.CODEC_RAW, - ), - ], - stub_message(1), - [PublicBatch( + [ + PublicBatch( + session_metadata={}, + messages=[stub_message(1)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + PublicBatch( + session_metadata={}, + messages=[stub_message(2), stub_message(3)], + _partition_session=stub_partition_session(), + _bytes_size=0, + _codec=Codec.CODEC_RAW, + ), + ], + stub_message(1), + [ + PublicBatch( session_metadata={}, messages=[stub_message(2), stub_message(3)], _partition_session=stub_partition_session(), _bytes_size=0, _codec=Codec.CODEC_RAW, - )], + ) + ], ), - - ] + ], ) async def test_read_message( - self, - stream_reader, - batches_before: typing.List[datatypes.PublicBatch], - expected_message: PublicMessage, - batches_after: typing.List[datatypes.PublicBatch], + self, + stream_reader, + batches_before: typing.List[datatypes.PublicBatch], + expected_message: PublicMessage, + batches_after: typing.List[datatypes.PublicBatch], ): stream_reader._message_batches = deque(batches_before) mess = stream_reader.receive_message_nowait() @@ -1118,17 +1119,17 @@ async def test_receive_batch_nowait(self, stream, stream_reader, partition_sessi ) assert ( - stream_reader._buffer_size_bytes - == initial_buffer_size + 2 * self.default_batch_size + stream_reader._buffer_size_bytes + == initial_buffer_size + 2 * self.default_batch_size ) assert ( - StreamReadMessage.ReadRequest(self.default_batch_size) - == stream.from_client.get_nowait().client_message + StreamReadMessage.ReadRequest(self.default_batch_size) + == stream.from_client.get_nowait().client_message ) assert ( - StreamReadMessage.ReadRequest(self.default_batch_size) - == stream.from_client.get_nowait().client_message + StreamReadMessage.ReadRequest(self.default_batch_size) + == stream.from_client.get_nowait().client_message ) with pytest.raises(asyncio.QueueEmpty): @@ -1193,9 +1194,9 @@ async def wait_messages(): stream_index = 0 async def stream_create( - reader_reconnector_id: int, - driver: SupportedDriverType, - settings: PublicReaderSettings, + reader_reconnector_id: int, + driver: SupportedDriverType, + settings: PublicReaderSettings, ): nonlocal stream_index stream_index += 1 diff --git a/ydb/_topic_reader/topic_reader_sync.py b/ydb/_topic_reader/topic_reader_sync.py index 2d54ead4..72543e76 100644 --- a/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/_topic_reader/topic_reader_sync.py @@ -87,7 +87,9 @@ def messages( """ raise NotImplementedError() - def receive_message(self, *, timeout: Union[float, None] = None) -> datatypes.PublicMessage: + def receive_message( + self, *, timeout: Union[float, None] = None + ) -> datatypes.PublicMessage: """ Block until receive new message It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. @@ -103,7 +105,9 @@ def receive_message(self, *, timeout: Union[float, None] = None) -> datatypes.Pu return self._call_sync(self._async_reader.receive_message(), timeout) def _receive_message_nowait(self) -> Optional[datatypes.PublicMessage]: - return self._call_nowait(lambda: self._async_reader._reconnector.receive_message_nowait()) + return self._call_nowait( + lambda: self._async_reader._reconnector.receive_message_nowait() + ) def async_wait_message(self) -> concurrent.futures.Future: """ @@ -155,8 +159,9 @@ def receive_batch( ) def _receive_batch_nowait(self) -> Optional[PublicBatch]: - return self._caller.call_sync(lambda: self._async_reader._reconnector.receive_batch_nowait()) - + return self._caller.call_sync( + lambda: self._async_reader._reconnector.receive_batch_nowait() + ) def commit( self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch] From a2cda766eb78b7c61a3bb7d433f5345c68353b23 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Mon, 20 Mar 2023 13:39:19 +0300 Subject: [PATCH 3/5] read one message --- tests/topics/test_topic_reader.py | 32 ++++++++++++++++++++++++-- ydb/_topic_reader/topic_reader_sync.py | 32 +++++++++++--------------- 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index 2a451baf..84d61a43 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -5,12 +5,26 @@ @pytest.mark.asyncio class TestTopicReaderAsyncIO: + async def test_read_batch( + self, driver, topic_path, topic_with_messages, topic_consumer + ): + reader = driver.topic_client.reader(topic_consumer, topic_path) + batch = await reader.receive_batch() + + assert batch is not None + assert len(batch.messages) > 0 + + await reader.close() + async def test_read_message( self, driver, topic_path, topic_with_messages, topic_consumer ): reader = driver.topic_client.reader(topic_consumer, topic_path) + msg = await reader.receive_message() + + assert msg is not None + assert msg.seqno - assert await reader.receive_batch() is not None await reader.close() async def test_read_and_commit_message( @@ -59,12 +73,26 @@ def decode(b: bytes): class TestTopicReaderSync: + def test_read_batch( + self, driver_sync, topic_path, topic_with_messages, topic_consumer + ): + reader = driver_sync.topic_client.reader(topic_consumer, topic_path) + batch = reader.receive_batch() + + assert batch is not None + assert len(batch.messages) > 0 + + reader.close() + def test_read_message( self, driver_sync, topic_path, topic_with_messages, topic_consumer ): reader = driver_sync.topic_client.reader(topic_consumer, topic_path) + msg = reader.receive_message() + + assert msg is not None + assert msg.seqno - assert reader.receive_batch() is not None reader.close() def test_read_and_commit_message( diff --git a/ydb/_topic_reader/topic_reader_sync.py b/ydb/_topic_reader/topic_reader_sync.py index 72543e76..ed9730fa 100644 --- a/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/_topic_reader/topic_reader_sync.py @@ -83,12 +83,13 @@ def messages( It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration - if timeout <= 0 - it will fast non block method, get messages from internal buffer only. + if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, + get messages from internal buffer only. """ raise NotImplementedError() def receive_message( - self, *, timeout: Union[float, None] = None + self, *, timeout: TimeoutType = None ) -> datatypes.PublicMessage: """ Block until receive new message @@ -97,16 +98,12 @@ def receive_message( or connection to server lost if no new message in timeout seconds (default - infinite): raise TimeoutError() - if timeout <= 0 - it will fast non block method, get messages from internal buffer only. + if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, get messages from internal buffer only. """ - if timeout <= 0: - return self._receive_message_nowait() - - return self._call_sync(self._async_reader.receive_message(), timeout) + self._check_closed() - def _receive_message_nowait(self) -> Optional[datatypes.PublicMessage]: - return self._call_nowait( - lambda: self._async_reader._reconnector.receive_message_nowait() + return self._caller.safe_call_with_result( + self._async_reader.receive_message(), timeout ) def async_wait_message(self) -> concurrent.futures.Future: @@ -117,7 +114,11 @@ def async_wait_message(self) -> concurrent.futures.Future: Possible situation when receive signal about message available, but no messages when try to receive a message. If message expired between send event and try to retrieve message (for example connection broken). """ - return self._call(self._async_reader._reconnector.wait_message()) + self._check_closed() + + return self._caller.unsafe_call_with_future( + self._async_reader._reconnector.wait_message() + ) def batches( self, @@ -131,7 +132,7 @@ def batches( It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration - if timeout <= 0 - it will fast non block method, get messages from internal buffer only. + if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, get messages from internal buffer only. """ raise NotImplementedError() @@ -147,7 +148,7 @@ def receive_batch( It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. if no new message in timeout seconds (default - infinite): raise TimeoutError() - if timeout <= 0 - it will fast non block method, get messages from internal buffer only. + if timeout <= 0 - it will fast wait only one event loop cycle - without wait any i/o operations or pauses, get messages from internal buffer only. """ self._check_closed() @@ -158,11 +159,6 @@ def receive_batch( timeout, ) - def _receive_batch_nowait(self) -> Optional[PublicBatch]: - return self._caller.call_sync( - lambda: self._async_reader._reconnector.receive_batch_nowait() - ) - def commit( self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch] ): From c73d9f6475fc631eccce7083a9bc14082c380a01 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Tue, 21 Mar 2023 08:10:10 +0300 Subject: [PATCH 4/5] style fix --- ydb/_topic_reader/datatypes.py | 9 ++------- ydb/_topic_reader/topic_reader_asyncio.py | 4 ++-- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/ydb/_topic_reader/datatypes.py b/ydb/_topic_reader/datatypes.py index 860525ab..cff0fed8 100644 --- a/ydb/_topic_reader/datatypes.py +++ b/ydb/_topic_reader/datatypes.py @@ -179,7 +179,7 @@ def _commit_get_offsets_range(self) -> OffsetsRange: self.messages[-1]._commit_get_offsets_range().end, ) - def is_empty(self) -> bool: + def empty(self) -> bool: return len(self.messages) == 0 # ISessionAlive implementation @@ -192,9 +192,4 @@ def is_alive(self) -> bool: ) def pop_message(self) -> PublicMessage: - if len(self.messages) == 0: - raise IndexError() - - res = self.messages[0] - self.messages = self.messages[1:] - return res + return self.messages.pop() diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 7266ae43..0068e4ba 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -125,7 +125,7 @@ async def receive_batch( await self._reconnector.wait_message() return self._reconnector.receive_batch_nowait() - async def receive_message(self) -> typing.Union[datatypes.PublicMessage, None]: + async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]: """ Block until receive new message @@ -414,7 +414,7 @@ def receive_message_nowait(self): except IndexError: return None - if batch.is_empty(): + if batch.empty(): self._message_batches.popleft() return message From 8c1092451b35013cac031281707cb8fab76e9c17 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Tue, 21 Mar 2023 09:39:56 +0300 Subject: [PATCH 5/5] fix typo --- ydb/_topic_reader/datatypes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/_topic_reader/datatypes.py b/ydb/_topic_reader/datatypes.py index cff0fed8..5376c76d 100644 --- a/ydb/_topic_reader/datatypes.py +++ b/ydb/_topic_reader/datatypes.py @@ -192,4 +192,4 @@ def is_alive(self) -> bool: ) def pop_message(self) -> PublicMessage: - return self.messages.pop() + return self.messages.pop(0)