Skip to content

Commit dfb2af4

Browse files
committed
implement writer codecs
1 parent c125576 commit dfb2af4

File tree

8 files changed

+437
-31
lines changed

8 files changed

+437
-31
lines changed

tests/conftest.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,9 @@ async def topic_path(driver, topic_consumer, database) -> str:
131131
@pytest.fixture()
132132
@pytest.mark.asyncio()
133133
async def topic_with_messages(driver, topic_path):
134-
writer = driver.topic_client.writer(topic_path, producer_id="fixture-producer-id")
134+
writer = driver.topic_client.writer(
135+
topic_path, producer_id="fixture-producer-id", codec=ydb.TopicCodec.RAW
136+
)
135137
await writer.write_with_ack(
136138
[
137139
ydb.TopicWriterMessage(data="123".encode()),

tests/topics/test_topic_writer.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,20 @@ async def test_write_multi_message_with_ack(
8484
assert batch.messages[0].seqno == 2
8585
assert batch.messages[0].data == "456".encode()
8686

87+
@pytest.mark.parametrize(
88+
"codec",
89+
[
90+
ydb.TopicCodec.RAW,
91+
ydb.TopicCodec.GZIP,
92+
None,
93+
],
94+
)
95+
async def test_write_encoded(self, driver: ydb.Driver, topic_path: str, codec):
96+
async with driver.topic_client.writer(topic_path, codec=codec) as writer:
97+
writer.write("a" * 1000)
98+
writer.write("b" * 1000)
99+
writer.write("c" * 1000)
100+
87101

88102
class TestTopicWriterSync:
89103
def test_send_message(self, driver_sync: ydb.Driver, topic_path):
@@ -163,3 +177,17 @@ def test_write_multi_message_with_ack(
163177
assert batch.messages[0].offset == 1
164178
assert batch.messages[0].seqno == 2
165179
assert batch.messages[0].data == "456".encode()
180+
181+
@pytest.mark.parametrize(
182+
"codec",
183+
[
184+
ydb.TopicCodec.RAW,
185+
ydb.TopicCodec.GZIP,
186+
None,
187+
],
188+
)
189+
def test_write_encoded(self, driver_sync: ydb.Driver, topic_path: str, codec):
190+
with driver_sync.topic_client.writer(topic_path, codec=codec) as writer:
191+
writer.write("a" * 1000)
192+
writer.write("b" * 1000)
193+
writer.write("c" * 1000)

ydb/_grpc/grpcwrapper/ydb_topic_public_types.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,18 @@ class CreateTopicRequestParams:
3131

3232

3333
class PublicCodec(int):
34+
"""
35+
Codec value may contain any int number.
36+
37+
Values below is only well-known predefined values,
38+
but protocol support custom codecs.
39+
"""
40+
3441
UNSPECIFIED = 0
3542
RAW = 1
3643
GZIP = 2
37-
LZOP = 3
38-
ZSTD = 4
44+
LZOP = 3 # Has not supported codec in standard library
45+
ZSTD = 4 # Has not supported codec in standard library
3946

4047

4148
class PublicMeteringMode(IntEnum):

ydb/_topic_writer/topic_writer.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import concurrent.futures
12
import datetime
23
import enum
34
import uuid
@@ -10,7 +11,7 @@
1011
import ydb.aio
1112
from .._grpc.grpcwrapper.ydb_topic import Codec, StreamWriteMessage
1213
from .._grpc.grpcwrapper.common_utils import IToProto
13-
14+
from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec
1415

1516
MessageType = typing.Union["PublicMessage", "PublicMessage.SimpleMessageSourceType"]
1617

@@ -29,6 +30,10 @@ class PublicWriterSettings:
2930
partition_id: Optional[int] = None
3031
auto_seqno: bool = True
3132
auto_created_at: bool = True
33+
codec: Optional[PublicCodec] = None # default mean auto-select
34+
encoder_executor: Optional[
35+
concurrent.futures.Executor
36+
] = None # default shared client executor pool
3237
# get_last_seqno: bool = False
3338
# encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
3439
# serializer: Union[Callable[[Any], bytes], None] = None
@@ -85,8 +90,9 @@ class SendMode(Enum):
8590

8691
@dataclass
8792
class PublicWriterInitInfo:
88-
__slots__ = "last_seqno"
93+
__slots__ = ("last_seqno", "supported_codecs")
8994
last_seqno: Optional[int]
95+
supported_codecs: List[PublicCodec]
9096

9197

9298
class PublicMessage:
@@ -117,15 +123,17 @@ def _create_message(
117123

118124

119125
class InternalMessage(StreamWriteMessage.WriteRequest.MessageData, IToProto):
126+
codec: PublicCodec
127+
120128
def __init__(self, mess: PublicMessage):
121-
StreamWriteMessage.WriteRequest.MessageData.__init__(
122-
self,
129+
super().__init__(
123130
seq_no=mess.seqno,
124131
created_at=mess.created_at,
125132
data=mess.data,
126133
uncompressed_size=len(mess.data),
127134
partitioning=None,
128135
)
136+
self.codec = PublicCodec.RAW
129137

130138
def get_bytes(self) -> bytes:
131139
if self.data is None:

0 commit comments

Comments
 (0)