Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion tests/topics/test_topic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@


@pytest.mark.asyncio
class TestTopicWriterAsyncIO:
class TestTopicReaderAsyncIO:
async def test_read_message(
self, driver, topic_path, topic_with_messages, topic_consumer
):
reader = driver.topic_client.topic_reader(topic_consumer, topic_path)

assert await reader.receive_batch() is not None
await reader.close()


class TestTopicReaderSync:
def test_read_message(
self, driver_sync, topic_path, topic_with_messages, topic_consumer
):
reader = driver_sync.topic_client.topic_reader(topic_consumer, topic_path)

assert reader.receive_batch() is not None
reader.close()
37 changes: 37 additions & 0 deletions ydb/_topic_common/common.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import asyncio
import concurrent.futures
import threading
import typing
from typing import Optional

from .. import operation, issues
from .._grpc.grpcwrapper.common_utils import IFromProtoWithProtoType
Expand All @@ -24,3 +28,36 @@ def wrapper(rpc_state, response_pb, driver=None):
return result_type.from_proto(msg)

return wrapper


_shared_event_loop_lock = threading.Lock()
_shared_event_loop = None # type: Optional[asyncio.AbstractEventLoop]


def _get_shared_event_loop() -> asyncio.AbstractEventLoop:
global _shared_event_loop

if _shared_event_loop is not None:
return _shared_event_loop

with _shared_event_loop_lock:
if _shared_event_loop is not None:
return _shared_event_loop

event_loop_set_done = concurrent.futures.Future()

def start_event_loop():
event_loop = asyncio.new_event_loop()
event_loop_set_done.set_result(event_loop)
asyncio.set_event_loop(event_loop)
event_loop.run_forever()

t = threading.Thread(
target=start_event_loop,
name="Common ydb topic event loop",
daemon=True,
)
t.start()

_shared_event_loop = event_loop_set_done.result()
return _shared_event_loop
127 changes: 0 additions & 127 deletions ydb/_topic_reader/topic_reader.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import concurrent.futures
import enum
import datetime
from dataclasses import dataclass
from typing import (
Union,
Optional,
List,
Iterable,
)

from ..table import RetrySettings
from .datatypes import ICommittable, PublicBatch, PublicMessage
from .._topic_common.common import TokenGetterFuncType
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange

Expand All @@ -26,130 +23,6 @@ def __init__(self, path, *, partitions: Union[None, int, List[int]] = None):
self.partitions = partitions


class Reader(object):
def async_sessions_stat(self) -> concurrent.futures.Future:
"""
Receive stat from the server, return feature.
"""
raise NotImplementedError()

async def sessions_stat(self) -> List["SessionStat"]:
"""
Receive stat from the server

use async_sessions_stat for set explicit wait timeout
"""
raise NotImplementedError()

def messages(
self, *, timeout: Union[float, None] = None
) -> Iterable[PublicMessage]:
"""
todo?

Block until receive new message
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.

if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
"""
raise NotImplementedError()

def receive_message(self, *, timeout: Union[float, None] = None) -> PublicMessage:
"""
Block until receive new message
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.

if no new message in timeout seconds (default - infinite): raise TimeoutError()
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
"""
raise NotImplementedError()

def async_wait_message(self) -> concurrent.futures.Future:
"""
Return future, which will completed when the reader has least one message in queue.
If reader already has message - future will return completed.

Possible situation when receive signal about message available, but no messages when try to receive a message.
If message expired between send event and try to retrieve message (for example connection broken).
"""
raise NotImplementedError()

def batches(
self,
*,
max_messages: Union[int, None] = None,
max_bytes: Union[int, None] = None,
timeout: Union[float, None] = None,
) -> Iterable[PublicBatch]:
"""
Block until receive new batch.
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.

if no new message in timeout seconds (default - infinite): stop iterations by raise StopIteration
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
"""
raise NotImplementedError()

def receive_batch(
self,
*,
max_messages: Union[int, None] = None,
max_bytes: Union[int, None],
timeout: Union[float, None] = None,
) -> Union[PublicBatch, None]:
"""
Get one messages batch from reader
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.

if no new message in timeout seconds (default - infinite): raise TimeoutError()
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
"""
raise NotImplementedError()

def commit(self, mess: ICommittable):
"""
Put commit message to internal buffer.

For the method no way check the commit result
(for example if lost connection - commits will not re-send and committed messages will receive again)
"""
raise NotImplementedError()

def commit_with_ack(
self, mess: ICommittable
) -> Union["CommitResult", List["CommitResult"]]:
"""
write commit message to a buffer and wait ack from the server.

if receive in timeout seconds (default - infinite): raise TimeoutError()
"""
raise NotImplementedError()

def async_commit_with_ack(
self, mess: ICommittable
) -> Union["CommitResult", List["CommitResult"]]:
"""
write commit message to a buffer and return Future for wait result.
"""
raise NotImplementedError()

def async_flush(self) -> concurrent.futures.Future:
"""
force send all commit messages from internal buffers to server and return Future for wait server acks.
"""
raise NotImplementedError()

def flush(self):
"""
force send all commit messages from internal buffers to server and wait acks for all of them.
"""
raise NotImplementedError()

def close(self):
raise NotImplementedError()


@dataclass
class PublicReaderSettings:
consumer: str
Expand Down
Loading