From 67dc0dd7df03bddf5a1b26653a1f2f7334b1d3f2 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 22 Feb 2023 21:21:32 +0300 Subject: [PATCH 1/2] add sync reader --- tests/topics/test_topic_reader.py | 12 +- ydb/_topic_common/common.py | 37 +++++ ydb/_topic_reader/topic_reader.py | 127 ---------------- ydb/_topic_reader/topic_reader_sync.py | 195 +++++++++++++++++++++++++ ydb/_topic_writer/topic_writer_sync.py | 53 ++----- ydb/topic.py | 42 +++--- 6 files changed, 274 insertions(+), 192 deletions(-) create mode 100644 ydb/_topic_reader/topic_reader_sync.py diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index ac338fbd..8107ac16 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -2,7 +2,7 @@ @pytest.mark.asyncio -class TestTopicWriterAsyncIO: +class TestTopicReaderAsyncIO: async def test_read_message( self, driver, topic_path, topic_with_messages, topic_consumer ): @@ -10,3 +10,13 @@ async def test_read_message( assert await reader.receive_batch() is not None await reader.close() + + +class TestTopicReaderSync: + def test_read_message( + self, driver_sync, topic_path, topic_with_messages, topic_consumer + ): + reader = driver_sync.topic_client.topic_reader(topic_consumer, topic_path) + + assert reader.receive_batch() is not None + reader.close() diff --git a/ydb/_topic_common/common.py b/ydb/_topic_common/common.py index e325ca4b..5bb10654 100644 --- a/ydb/_topic_common/common.py +++ b/ydb/_topic_common/common.py @@ -1,4 +1,8 @@ +import asyncio +import concurrent.futures +import threading import typing +from typing import Optional from .. import operation, issues from .._grpc.grpcwrapper.common_utils import IFromProtoWithProtoType @@ -24,3 +28,36 @@ def wrapper(rpc_state, response_pb, driver=None): return result_type.from_proto(msg) return wrapper + + +_shared_event_loop_lock = threading.Lock() +_shared_event_loop = None # type: Optional[asyncio.AbstractEventLoop] + + +def _get_shared_event_loop() -> asyncio.AbstractEventLoop: + global _shared_event_loop + + if _shared_event_loop is not None: + return _shared_event_loop + + with _shared_event_loop_lock: + if _shared_event_loop is not None: + return _shared_event_loop + + event_loop_set_done = concurrent.futures.Future() + + def start_event_loop(): + event_loop = asyncio.new_event_loop() + event_loop_set_done.set_result(event_loop) + asyncio.set_event_loop(event_loop) + event_loop.run_forever() + + t = threading.Thread( + target=start_event_loop, + name="Common ydb topic writer event loop", + daemon=True, + ) + t.start() + + _shared_event_loop = event_loop_set_done.result() + return _shared_event_loop diff --git a/ydb/_topic_reader/topic_reader.py b/ydb/_topic_reader/topic_reader.py index 7bb6d934..4c9e63e1 100644 --- a/ydb/_topic_reader/topic_reader.py +++ b/ydb/_topic_reader/topic_reader.py @@ -1,4 +1,3 @@ -import concurrent.futures import enum import datetime from dataclasses import dataclass @@ -6,11 +5,9 @@ Union, Optional, List, - Iterable, ) from ..table import RetrySettings -from .datatypes import ICommittable, PublicBatch, PublicMessage from .._topic_common.common import TokenGetterFuncType from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange @@ -26,130 +23,6 @@ def __init__(self, path, *, partitions: Union[None, int, List[int]] = None): self.partitions = partitions -class Reader(object): - def async_sessions_stat(self) -> concurrent.futures.Future: - """ - Receive stat from the server, return feature. - """ - raise NotImplementedError() - - async def sessions_stat(self) -> List["SessionStat"]: - """ - Receive stat from the server - - use async_sessions_stat for set explicit wait timeout - """ - raise NotImplementedError() - - def messages( - self, *, timeout: Union[float, None] = None - ) -> Iterable[PublicMessage]: - """ - todo? - - Block until receive new message - It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. - - if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration - if timeout <= 0 - it will fast non block method, get messages from internal buffer only. - """ - raise NotImplementedError() - - def receive_message(self, *, timeout: Union[float, None] = None) -> PublicMessage: - """ - Block until receive new message - It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. - - if no new message in timeout seconds (default - infinite): raise TimeoutError() - if timeout <= 0 - it will fast non block method, get messages from internal buffer only. - """ - raise NotImplementedError() - - def async_wait_message(self) -> concurrent.futures.Future: - """ - Return future, which will completed when the reader has least one message in queue. - If reader already has message - future will return completed. - - Possible situation when receive signal about message available, but no messages when try to receive a message. - If message expired between send event and try to retrieve message (for example connection broken). - """ - raise NotImplementedError() - - def batches( - self, - *, - max_messages: Union[int, None] = None, - max_bytes: Union[int, None] = None, - timeout: Union[float, None] = None, - ) -> Iterable[PublicBatch]: - """ - Block until receive new batch. - It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. - - if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration - if timeout <= 0 - it will fast non block method, get messages from internal buffer only. - """ - raise NotImplementedError() - - def receive_batch( - self, - *, - max_messages: Union[int, None] = None, - max_bytes: Union[int, None], - timeout: Union[float, None] = None, - ) -> Union[PublicBatch, None]: - """ - Get one messages batch from reader - It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. - - if no new message in timeout seconds (default - infinite): raise TimeoutError() - if timeout <= 0 - it will fast non block method, get messages from internal buffer only. - """ - raise NotImplementedError() - - def commit(self, mess: ICommittable): - """ - Put commit message to internal buffer. - - For the method no way check the commit result - (for example if lost connection - commits will not re-send and committed messages will receive again) - """ - raise NotImplementedError() - - def commit_with_ack( - self, mess: ICommittable - ) -> Union["CommitResult", List["CommitResult"]]: - """ - write commit message to a buffer and wait ack from the server. - - if receive in timeout seconds (default - infinite): raise TimeoutError() - """ - raise NotImplementedError() - - def async_commit_with_ack( - self, mess: ICommittable - ) -> Union["CommitResult", List["CommitResult"]]: - """ - write commit message to a buffer and return Future for wait result. - """ - raise NotImplementedError() - - def async_flush(self) -> concurrent.futures.Future: - """ - force send all commit messages from internal buffers to server and return Future for wait server acks. - """ - raise NotImplementedError() - - def flush(self): - """ - force send all commit messages from internal buffers to server and wait acks for all of them. - """ - raise NotImplementedError() - - def close(self): - raise NotImplementedError() - - @dataclass class PublicReaderSettings: consumer: str diff --git a/ydb/_topic_reader/topic_reader_sync.py b/ydb/_topic_reader/topic_reader_sync.py new file mode 100644 index 00000000..b30b547a --- /dev/null +++ b/ydb/_topic_reader/topic_reader_sync.py @@ -0,0 +1,195 @@ +import asyncio +import concurrent.futures +import typing +from typing import List, Union, Iterable, Optional, Coroutine + +from ydb._grpc.grpcwrapper.common_utils import SupportedDriverType +from ydb._topic_common.common import _get_shared_event_loop +from ydb._topic_reader.datatypes import PublicMessage, PublicBatch, ICommittable +from ydb._topic_reader.topic_reader import ( + PublicReaderSettings, + SessionStat, + CommitResult, +) +from ydb._topic_reader.topic_reader_asyncio import ( + PublicAsyncIOReader, + TopicReaderClosedError, +) + + +class TopicReaderSync: + _loop: asyncio.AbstractEventLoop + _async_reader: PublicAsyncIOReader + _closed: bool + + def __init__( + self, + driver: SupportedDriverType, + settings: PublicReaderSettings, + *, + eventloop: Optional[asyncio.AbstractEventLoop] = None, + ): + self._closed = False + + if eventloop: + self._loop = eventloop + else: + self._loop = _get_shared_event_loop() + + async def create_reader(): + return PublicAsyncIOReader(driver, settings) + + self._async_reader = asyncio.run_coroutine_threadsafe( + create_reader(), self._loop + ).result() + + def __del__(self): + self.close() + + def _call(self, coro): + if self._closed: + raise TopicReaderClosedError() + + return asyncio.run_coroutine_threadsafe(coro, self._loop) + + def _call_sync(self, coro: Coroutine, timeout): + f = self._call(coro) + try: + return f.result(timeout) + except TimeoutError: + f.cancel() + raise + + def async_sessions_stat(self) -> concurrent.futures.Future: + """ + Receive stat from the server, return feature. + """ + raise NotImplementedError() + + async def sessions_stat(self) -> List[SessionStat]: + """ + Receive stat from the server + + use async_sessions_stat for set explicit wait timeout + """ + raise NotImplementedError() + + def messages( + self, *, timeout: Union[float, None] = None + ) -> Iterable[PublicMessage]: + """ + todo? + + Block until receive new message + It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. + + if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration + if timeout <= 0 - it will fast non block method, get messages from internal buffer only. + """ + raise NotImplementedError() + + def receive_message(self, *, timeout: Union[float, None] = None) -> PublicMessage: + """ + Block until receive new message + It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. + + if no new message in timeout seconds (default - infinite): raise TimeoutError() + if timeout <= 0 - it will fast non block method, get messages from internal buffer only. + """ + raise NotImplementedError() + + def async_wait_message(self) -> concurrent.futures.Future: + """ + Return future, which will completed when the reader has least one message in queue. + If reader already has message - future will return completed. + + Possible situation when receive signal about message available, but no messages when try to receive a message. + If message expired between send event and try to retrieve message (for example connection broken). + """ + raise NotImplementedError() + + def batches( + self, + *, + max_messages: Union[int, None] = None, + max_bytes: Union[int, None] = None, + timeout: Union[float, None] = None, + ) -> Iterable[PublicBatch]: + """ + Block until receive new batch. + It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. + + if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration + if timeout <= 0 - it will fast non block method, get messages from internal buffer only. + """ + raise NotImplementedError() + + def receive_batch( + self, + *, + max_messages: typing.Union[int, None] = None, + max_bytes: typing.Union[int, None] = None, + timeout: Union[float, None] = None, + ) -> Union[PublicBatch, None]: + """ + Get one messages batch from reader + It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. + + if no new message in timeout seconds (default - infinite): raise TimeoutError() + if timeout <= 0 - it will fast non block method, get messages from internal buffer only. + """ + return self._call_sync( + self._async_reader.receive_batch( + max_messages=max_messages, max_bytes=max_bytes + ), + timeout, + ) + + def commit(self, mess: ICommittable): + """ + Put commit message to internal buffer. + + For the method no way check the commit result + (for example if lost connection - commits will not re-send and committed messages will receive again) + """ + self._call_sync(self._async_reader.commit(mess), None) + + def commit_with_ack( + self, mess: ICommittable + ) -> Union[CommitResult, List[CommitResult]]: + """ + write commit message to a buffer and wait ack from the server. + + if receive in timeout seconds (default - infinite): raise TimeoutError() + """ + raise NotImplementedError() + + def async_commit_with_ack( + self, mess: ICommittable + ) -> Union[CommitResult, List[CommitResult]]: + """ + write commit message to a buffer and return Future for wait result. + """ + raise NotImplementedError() + + def async_flush(self) -> concurrent.futures.Future: + """ + force send all commit messages from internal buffers to server and return Future for wait server acks. + """ + raise NotImplementedError() + + def flush(self): + """ + force send all commit messages from internal buffers to server and wait acks for all of them. + """ + raise NotImplementedError() + + def close(self): + if self._closed: + return + self._closed = True + + # for no call self._call_sync on closed object + asyncio.run_coroutine_threadsafe( + self._async_reader.close(), self._loop + ).result() diff --git a/ydb/_topic_writer/topic_writer_sync.py b/ydb/_topic_writer/topic_writer_sync.py index 2c58325e..419edcba 100644 --- a/ydb/_topic_writer/topic_writer_sync.py +++ b/ydb/_topic_writer/topic_writer_sync.py @@ -2,7 +2,6 @@ import asyncio from concurrent.futures import Future -import threading from typing import Union, List, Optional, Coroutine from .._grpc.grpcwrapper.common_utils import SupportedDriverType @@ -16,40 +15,7 @@ ) from .topic_writer_asyncio import WriterAsyncIO -from .._topic_common.common import TimeoutType - -_shared_event_loop_lock = threading.Lock() -_shared_event_loop = None # type: Optional[asyncio.AbstractEventLoop] - - -def _get_shared_event_loop() -> asyncio.AbstractEventLoop: - global _shared_event_loop - - if _shared_event_loop is not None: - return _shared_event_loop - - with _shared_event_loop_lock: - if _shared_event_loop is not None: - return _shared_event_loop - - event_loop_set_done = Future() - - def start_event_loop(): - global _shared_event_loop - _shared_event_loop = asyncio.new_event_loop() - event_loop_set_done.set_result(None) - asyncio.set_event_loop(_shared_event_loop) - _shared_event_loop.run_forever() - - t = threading.Thread( - target=start_event_loop, - name="Common ydb topic writer event loop", - daemon=True, - ) - t.start() - - event_loop_set_done.result() - return _shared_event_loop +from .._topic_common.common import _get_shared_event_loop, TimeoutType class WriterSync: @@ -62,7 +28,7 @@ def __init__( driver: SupportedDriverType, settings: PublicWriterSettings, *, - eventloop: asyncio.AbstractEventLoop = None, + eventloop: Optional[asyncio.AbstractEventLoop] = None, ): self._closed = False @@ -85,26 +51,29 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.close() - def _call(self, coro, *args, **kwargs): + def _call(self, coro): if self._closed: raise TopicWriterError("writer is closed") return asyncio.run_coroutine_threadsafe(coro, self._loop) - def _call_sync(self, coro: Coroutine, timeout, *args, **kwargs): - f = self._call(coro, *args, **kwargs) + def _call_sync(self, coro: Coroutine, timeout): + f = self._call(coro) try: - return f.result(timeout=timeout) + return f.result(timeout) except TimeoutError: f.cancel() raise - def close(self): + def close(self, flush: bool = True): if self._closed: return + self._closed = True + + # for no call self._call_sync on closed object asyncio.run_coroutine_threadsafe( - self._async_writer.close(), self._loop + self._async_writer.close(flush=flush), self._loop ).result() def async_flush(self) -> Future: diff --git a/ydb/topic.py b/ydb/topic.py index 593c0378..9378d100 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -7,13 +7,10 @@ from ._topic_reader.topic_reader import ( PublicReaderSettings as TopicReaderSettings, - Reader as TopicReader, - Selector as TopicSelector, - Events as TopicReaderEvents, - RetryPolicy as TopicReaderRetryPolicy, - StubEvent as TopicReaderStubEvent, ) +from ._topic_reader.topic_reader_sync import TopicReaderSync as TopicReader + from ._topic_reader.topic_reader_asyncio import ( PublicAsyncIOReader as TopicReaderAsyncIO, ) @@ -241,26 +238,27 @@ def drop_topic(self, path: str): def topic_reader( self, - topic: Union[str, TopicSelector, List[Union[str, TopicSelector]]], consumer: str, - commit_batch_time: Union[float, None] = 0.1, - commit_batch_count: Union[int, None] = 1000, + topic: str, buffer_size_bytes: int = 50 * 1024 * 1024, - sync_commit: bool = False, # reader.commit(...) will wait commit ack from server - on_commit: Callable[["TopicReaderStubEvent"], None] = None, - on_get_partition_start_offset: Callable[ - ["TopicReaderEvents.OnPartitionGetStartOffsetRequest"], - "TopicReaderEvents.OnPartitionGetStartOffsetResponse", - ] = None, - on_init_partition: Callable[["StubEvent"], None] = None, - on_shutdown_partition: Callable[["StubEvent"], None] = None, - decoder: Union[Mapping[int, Callable[[bytes], bytes]], None] = None, - deserializer: Union[Callable[[bytes], Any], None] = None, - one_attempt_connection_timeout: Union[float, None] = 1, - connection_timeout: Union[float, None] = None, - retry_policy: Union["TopicReaderRetryPolicy", None] = None, + # on_commit: Callable[["Events.OnCommit"], None] = None + # on_get_partition_start_offset: Callable[ + # ["Events.OnPartitionGetStartOffsetRequest"], + # "Events.OnPartitionGetStartOffsetResponse", + # ] = None + # on_partition_session_start: Callable[["StubEvent"], None] = None + # on_partition_session_stop: Callable[["StubEvent"], None] = None + # on_partition_session_close: Callable[["StubEvent"], None] = None # todo? + # decoder: Union[Mapping[int, Callable[[bytes], bytes]], None] = None + # deserializer: Union[Callable[[bytes], Any], None] = None + # one_attempt_connection_timeout: Union[float, None] = 1 + # connection_timeout: Union[float, None] = None + # retry_policy: Union["RetryPolicy", None] = None ) -> TopicReader: - raise NotImplementedError() + args = locals() + del args["self"] + settings = TopicReaderSettings(**args) + return TopicReader(self._driver, settings) def topic_writer( self, From 2249c6f109a287a62d5f6250a678529af6f79aca Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Tue, 28 Feb 2023 18:16:57 +0300 Subject: [PATCH 2/2] typo --- ydb/_topic_common/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/_topic_common/common.py b/ydb/_topic_common/common.py index 5bb10654..f2d6ca9b 100644 --- a/ydb/_topic_common/common.py +++ b/ydb/_topic_common/common.py @@ -54,7 +54,7 @@ def start_event_loop(): t = threading.Thread( target=start_event_loop, - name="Common ydb topic writer event loop", + name="Common ydb topic event loop", daemon=True, ) t.start()