Skip to content

Commit e0df5a5

Browse files
committed
add sync reader
1 parent eb5ee9f commit e0df5a5

File tree

6 files changed

+115
-39
lines changed

6 files changed

+115
-39
lines changed

tests/topics/test_topic_reader.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,19 @@
22

33

44
@pytest.mark.asyncio
5-
class TestTopicWriterAsyncIO:
5+
class TestTopicReaderAsyncIO:
66
async def test_read_message(
77
self, driver, topic_path, topic_with_messages, topic_consumer
88
):
99
reader = driver.topic_client.topic_reader(topic_consumer, topic_path)
1010

1111
assert await reader.receive_batch() is not None
12+
13+
14+
class TestTopicReaderSync:
15+
def test_read_message(
16+
self, driver_sync, topic_path, topic_with_messages, topic_consumer
17+
):
18+
reader = driver_sync.topic_client.topic_reader(topic_consumer, topic_path)
19+
20+
assert reader.receive_batch() is not None

ydb/_topic_common/common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import concurrent.futures
33
import threading
44
import typing
5+
from typing import Optional
56

67
from .. import operation, issues
78
from .._grpc.grpcwrapper.common_utils import IFromProtoWithProtoType
@@ -27,6 +28,7 @@ def wrapper(rpc_state, response_pb, driver=None):
2728

2829
return wrapper
2930

31+
3032
_shared_event_loop_lock = threading.Lock()
3133
_shared_event_loop = None # type: Optional[asyncio.AbstractEventLoop]
3234

ydb/_topic_reader/topic_reader.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
1-
import concurrent.futures
21
import enum
32
import datetime
43
from dataclasses import dataclass
54
from typing import (
65
Union,
76
Optional,
87
List,
9-
Iterable,
108
)
119

1210
from ..table import RetrySettings
13-
from .datatypes import ICommittable, PublicBatch, PublicMessage
1411
from .._topic_common.common import TokenGetterFuncType
1512
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange
1613

ydb/_topic_reader/topic_reader_sync.py

Lines changed: 77 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,72 @@
1+
import asyncio
2+
import concurrent.futures
3+
import typing
4+
from typing import List, Union, Iterable, Optional, Coroutine
5+
6+
from ydb._grpc.grpcwrapper.common_utils import SupportedDriverType
7+
from ydb._topic_common.common import _get_shared_event_loop
8+
from ydb._topic_reader.datatypes import PublicMessage, PublicBatch, ICommittable
9+
from ydb._topic_reader.topic_reader import (
10+
PublicReaderSettings,
11+
SessionStat,
12+
CommitResult,
13+
)
14+
from ydb._topic_reader.topic_reader_asyncio import (
15+
PublicAsyncIOReader,
16+
TopicReaderClosedError,
17+
)
18+
119

220
class TopicReaderSync:
21+
_loop: asyncio.AbstractEventLoop
22+
_async_reader: PublicAsyncIOReader
23+
_closed: bool
24+
25+
def __init__(
26+
self,
27+
driver: SupportedDriverType,
28+
settings: PublicReaderSettings,
29+
*,
30+
eventloop: Optional[asyncio.AbstractEventLoop] = None,
31+
):
32+
self._closed = False
33+
34+
if eventloop:
35+
self._loop = eventloop
36+
else:
37+
self._loop = _get_shared_event_loop()
38+
39+
async def create_reader():
40+
return PublicAsyncIOReader(driver, settings)
41+
42+
self._async_reader = asyncio.run_coroutine_threadsafe(
43+
create_reader(), self._loop
44+
).result()
45+
46+
def __del__(self):
47+
self.close()
48+
49+
def _call(self, coro):
50+
if self._closed:
51+
raise TopicReaderClosedError()
52+
53+
return asyncio.run_coroutine_threadsafe(coro, self._loop)
54+
55+
def _call_sync(self, coro: Coroutine, timeout):
56+
f = self._call(coro)
57+
try:
58+
return f.result(timeout)
59+
except TimeoutError:
60+
f.cancel()
61+
raise
62+
363
def async_sessions_stat(self) -> concurrent.futures.Future:
464
"""
565
Receive stat from the server, return feature.
666
"""
767
raise NotImplementedError()
868

9-
async def sessions_stat(self) -> List["SessionStat"]:
69+
async def sessions_stat(self) -> List[SessionStat]:
1070
"""
1171
Receive stat from the server
1272
@@ -67,8 +127,8 @@ def batches(
67127
def receive_batch(
68128
self,
69129
*,
70-
max_messages: Union[int, None] = None,
71-
max_bytes: Union[int, None],
130+
max_messages: typing.Union[int, None] = None,
131+
max_bytes: typing.Union[int, None] = None,
72132
timeout: Union[float, None] = None,
73133
) -> Union[PublicBatch, None]:
74134
"""
@@ -78,7 +138,12 @@ def receive_batch(
78138
if no new message in timeout seconds (default - infinite): raise TimeoutError()
79139
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
80140
"""
81-
raise NotImplementedError()
141+
return self._call_sync(
142+
self._async_reader.receive_batch(
143+
max_messages=max_messages, max_bytes=max_bytes
144+
),
145+
timeout,
146+
)
82147

83148
def commit(self, mess: ICommittable):
84149
"""
@@ -87,11 +152,11 @@ def commit(self, mess: ICommittable):
87152
For the method no way check the commit result
88153
(for example if lost connection - commits will not re-send and committed messages will receive again)
89154
"""
90-
raise NotImplementedError()
155+
self._call_sync(self._async_reader.commit(mess), None)
91156

92157
def commit_with_ack(
93158
self, mess: ICommittable
94-
) -> Union["CommitResult", List["CommitResult"]]:
159+
) -> Union[CommitResult, List[CommitResult]]:
95160
"""
96161
write commit message to a buffer and wait ack from the server.
97162
@@ -101,7 +166,7 @@ def commit_with_ack(
101166

102167
def async_commit_with_ack(
103168
self, mess: ICommittable
104-
) -> Union["CommitResult", List["CommitResult"]]:
169+
) -> Union[CommitResult, List[CommitResult]]:
105170
"""
106171
write commit message to a buffer and return Future for wait result.
107172
"""
@@ -120,4 +185,8 @@ def flush(self):
120185
raise NotImplementedError()
121186

122187
def close(self):
123-
raise NotImplementedError()
188+
if self._closed:
189+
return
190+
self._closed = True
191+
192+
self._call_sync(self._async_reader.close())

ydb/_topic_writer/topic_writer_sync.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from .topic_writer_asyncio import WriterAsyncIO
1818
from .._topic_common.common import _get_shared_event_loop
1919

20+
2021
class WriterSync:
2122
_loop: asyncio.AbstractEventLoop
2223
_async_writer: WriterAsyncIO
@@ -27,7 +28,7 @@ def __init__(
2728
driver: SupportedDriverType,
2829
settings: PublicWriterSettings,
2930
*,
30-
eventloop: asyncio.AbstractEventLoop = None,
31+
eventloop: Optional[asyncio.AbstractEventLoop] = None,
3132
):
3233

3334
self._closed = False
@@ -44,16 +45,16 @@ async def create_async_writer():
4445
create_async_writer(), self._loop
4546
).result()
4647

47-
def _call(self, coro, *args, **kwargs):
48+
def _call(self, coro):
4849
if self._closed:
4950
raise TopicWriterError("writer is closed")
5051

5152
return asyncio.run_coroutine_threadsafe(coro, self._loop)
5253

53-
def _call_sync(self, coro: Coroutine, timeout, *args, **kwargs):
54-
f = self._call(coro, *args, **kwargs)
54+
def _call_sync(self, coro: Coroutine, timeout):
55+
f = self._call(coro)
5556
try:
56-
return f.result()
57+
return f.result(timeout)
5758
except TimeoutError:
5859
f.cancel()
5960
raise

ydb/topic.py

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,10 @@
77

88
from ._topic_reader.topic_reader import (
99
PublicReaderSettings as TopicReaderSettings,
10-
Reader as TopicReader,
11-
Selector as TopicSelector,
12-
Events as TopicReaderEvents,
13-
RetryPolicy as TopicReaderRetryPolicy,
14-
StubEvent as TopicReaderStubEvent,
1510
)
1611

12+
from ._topic_reader.topic_reader_sync import TopicReaderSync as TopicReader
13+
1714
from ._topic_reader.topic_reader_asyncio import (
1815
PublicAsyncIOReader as TopicReaderAsyncIO,
1916
)
@@ -240,26 +237,27 @@ def drop_topic(self, path: str):
240237

241238
def topic_reader(
242239
self,
243-
topic: Union[str, TopicSelector, List[Union[str, TopicSelector]]],
244240
consumer: str,
245-
commit_batch_time: Union[float, None] = 0.1,
246-
commit_batch_count: Union[int, None] = 1000,
241+
topic: str,
247242
buffer_size_bytes: int = 50 * 1024 * 1024,
248-
sync_commit: bool = False, # reader.commit(...) will wait commit ack from server
249-
on_commit: Callable[["TopicReaderStubEvent"], None] = None,
250-
on_get_partition_start_offset: Callable[
251-
["TopicReaderEvents.OnPartitionGetStartOffsetRequest"],
252-
"TopicReaderEvents.OnPartitionGetStartOffsetResponse",
253-
] = None,
254-
on_init_partition: Callable[["StubEvent"], None] = None,
255-
on_shutdown_partition: Callable[["StubEvent"], None] = None,
256-
decoder: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
257-
deserializer: Union[Callable[[bytes], Any], None] = None,
258-
one_attempt_connection_timeout: Union[float, None] = 1,
259-
connection_timeout: Union[float, None] = None,
260-
retry_policy: Union["TopicReaderRetryPolicy", None] = None,
243+
# on_commit: Callable[["Events.OnCommit"], None] = None
244+
# on_get_partition_start_offset: Callable[
245+
# ["Events.OnPartitionGetStartOffsetRequest"],
246+
# "Events.OnPartitionGetStartOffsetResponse",
247+
# ] = None
248+
# on_partition_session_start: Callable[["StubEvent"], None] = None
249+
# on_partition_session_stop: Callable[["StubEvent"], None] = None
250+
# on_partition_session_close: Callable[["StubEvent"], None] = None # todo?
251+
# decoder: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
252+
# deserializer: Union[Callable[[bytes], Any], None] = None
253+
# one_attempt_connection_timeout: Union[float, None] = 1
254+
# connection_timeout: Union[float, None] = None
255+
# retry_policy: Union["RetryPolicy", None] = None
261256
) -> TopicReader:
262-
raise NotImplementedError()
257+
args = locals()
258+
del args["self"]
259+
settings = TopicReaderSettings(**args)
260+
return TopicReader(self._driver, settings)
263261

264262
def topic_writer(
265263
self,

0 commit comments

Comments
 (0)