Skip to content

Codec support for writer #198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ async def topic_path(driver, topic_consumer, database) -> str:
@pytest.fixture()
@pytest.mark.asyncio()
async def topic_with_messages(driver, topic_path):
writer = driver.topic_client.writer(topic_path, producer_id="fixture-producer-id")
writer = driver.topic_client.writer(
topic_path, producer_id="fixture-producer-id", codec=ydb.TopicCodec.RAW
)
await writer.write_with_ack(
[
ydb.TopicWriterMessage(data="123".encode()),
Expand Down
28 changes: 28 additions & 0 deletions tests/topics/test_topic_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ async def test_write_multi_message_with_ack(
assert batch.messages[0].seqno == 2
assert batch.messages[0].data == "456".encode()

@pytest.mark.parametrize(
"codec",
[
ydb.TopicCodec.RAW,
ydb.TopicCodec.GZIP,
None,
],
)
async def test_write_encoded(self, driver: ydb.Driver, topic_path: str, codec):
async with driver.topic_client.writer(topic_path, codec=codec) as writer:
writer.write("a" * 1000)
writer.write("b" * 1000)
writer.write("c" * 1000)


class TestTopicWriterSync:
def test_send_message(self, driver_sync: ydb.Driver, topic_path):
Expand Down Expand Up @@ -163,3 +177,17 @@ def test_write_multi_message_with_ack(
assert batch.messages[0].offset == 1
assert batch.messages[0].seqno == 2
assert batch.messages[0].data == "456".encode()

@pytest.mark.parametrize(
"codec",
[
ydb.TopicCodec.RAW,
ydb.TopicCodec.GZIP,
None,
],
)
def test_write_encoded(self, driver_sync: ydb.Driver, topic_path: str, codec):
with driver_sync.topic_client.writer(topic_path, codec=codec) as writer:
writer.write("a" * 1000)
writer.write("b" * 1000)
writer.write("c" * 1000)
11 changes: 9 additions & 2 deletions ydb/_grpc/grpcwrapper/ydb_topic_public_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ class CreateTopicRequestParams:


class PublicCodec(int):
"""
Codec value may contain any int number.

Values below is only well-known predefined values,
but protocol support custom codecs.
"""

UNSPECIFIED = 0
RAW = 1
GZIP = 2
LZOP = 3
ZSTD = 4
LZOP = 3 # Has not supported codec in standard library
ZSTD = 4 # Has not supported codec in standard library


class PublicMeteringMode(IntEnum):
Expand Down
26 changes: 17 additions & 9 deletions ydb/_topic_writer/topic_writer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import concurrent.futures
import datetime
import enum
import uuid
Expand All @@ -10,9 +11,9 @@
import ydb.aio
from .._grpc.grpcwrapper.ydb_topic import Codec, StreamWriteMessage
from .._grpc.grpcwrapper.common_utils import IToProto
from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec


MessageType = typing.Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"]
Message = typing.Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"]


@dataclass
Expand All @@ -29,8 +30,14 @@ class PublicWriterSettings:
partition_id: Optional[int] = None
auto_seqno: bool = True
auto_created_at: bool = True
codec: Optional[PublicCodec] = None # default mean auto-select
encoder_executor: Optional[
concurrent.futures.Executor
] = None # default shared client executor pool
encoders: Optional[
typing.Mapping[PublicCodec, typing.Callable[[bytes], bytes]]
] = None
# get_last_seqno: bool = False
# encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
# serializer: Union[Callable[[Any], bytes], None] = None
# send_buffer_count: Optional[int] = 10000
# send_buffer_bytes: Optional[int] = 100 * 1024 * 1024
Expand Down Expand Up @@ -85,8 +92,9 @@ class SendMode(Enum):

@dataclass
class PublicWriterInitInfo:
__slots__ = "last_seqno"
__slots__ = ("last_seqno", "supported_codecs")
last_seqno: Optional[int]
supported_codecs: List[PublicCodec]


class PublicMessage:
Expand All @@ -108,24 +116,24 @@ def __init__(
self.data = data

@staticmethod
def _create_message(
data: Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"]
) -> "PublicMessage":
def _create_message(data: Message) -> "PublicMessage":
if isinstance(data, PublicMessage):
return data
return PublicMessage(data=data)


class InternalMessage(StreamWriteMessage.WriteRequest.MessageData, IToProto):
codec: PublicCodec

def __init__(self, mess: PublicMessage):
StreamWriteMessage.WriteRequest.MessageData.__init__(
self,
super().__init__(
seq_no=mess.seqno,
created_at=mess.created_at,
data=mess.data,
uncompressed_size=len(mess.data),
partitioning=None,
)
self.codec = PublicCodec.RAW

def get_bytes(self) -> bytes:
if self.data is None:
Expand Down
Loading