Skip to content

Commit 74b7c06

Browse files
committed
fix review comments
1 parent badb27a commit 74b7c06

File tree

3 files changed

+62
-68
lines changed

3 files changed

+62
-68
lines changed

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 41 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
)
3434

3535

36-
class Codec(int, IToPublic):
36+
class Codec(int, IToPublic, IFromPublic):
3737
CODEC_UNSPECIFIED = 0
3838
CODEC_RAW = 1
3939
CODEC_GZIP = 2
@@ -47,9 +47,13 @@ def from_proto_iterable(codecs: typing.Iterable[int]) -> List["Codec"]:
4747
def to_public(self) -> ydb_topic_public_types.PublicCodec:
4848
return ydb_topic_public_types.PublicCodec(int(self))
4949

50+
@staticmethod
51+
def from_public(codec: Union[ydb_topic_public_types.PublicCodec, int]) -> "Codec":
52+
return Codec(int(codec))
53+
5054

5155
@dataclass
52-
class SupportedCodecs(IToProto, IFromProto, IToPublic):
56+
class SupportedCodecs(IToProto, IFromProto, IToPublic, IFromPublic):
5357
codecs: List[Codec]
5458

5559
def to_proto(self) -> ydb_topic_pb2.SupportedCodecs:
@@ -69,6 +73,13 @@ def from_proto(msg: Optional[ydb_topic_pb2.SupportedCodecs]) -> "SupportedCodecs
6973
def to_public(self) -> List[ydb_topic_public_types.PublicCodec]:
7074
return list(map(Codec.to_public, self.codecs))
7175

76+
@staticmethod
77+
def from_public(codecs: Optional[List[Union[ydb_topic_public_types.PublicCodec, int]]]) -> Optional["SupportedCodecs"]:
78+
if codecs is None:
79+
return None
80+
81+
return SupportedCodecs(codecs=[Codec.from_public(codec) for codec in codecs])
82+
7283

7384
@dataclass(order=True)
7485
class OffsetsRange(IFromProto, IToProto):
@@ -886,17 +897,21 @@ def from_proto(
886897
@dataclass
887898
class AlterConsumer(IToProto, IFromPublic):
888899
name: str
889-
set_important: bool
890-
set_read_from: datetime.datetime
891-
set_supported_codecs: SupportedCodecs
892-
alter_attributes: Dict[str, str]
900+
set_important: Optional[bool]
901+
set_read_from: Optional[datetime.datetime]
902+
set_supported_codecs: Optional[SupportedCodecs]
903+
alter_attributes: Optional[Dict[str, str]]
893904

894905
def to_proto(self) -> ydb_topic_pb2.AlterConsumer:
906+
supported_codecs = None
907+
if self.set_supported_codecs is not None:
908+
supported_codecs = self.set_supported_codecs.to_proto()
909+
895910
return ydb_topic_pb2.AlterConsumer(
896911
name=self.name,
897912
set_important=self.set_important,
898913
set_read_from=proto_timestamp_from_datetime(self.set_read_from),
899-
set_supported_codecs=self.set_supported_codecs.to_proto(),
914+
set_supported_codecs=supported_codecs,
900915
alter_attributes=self.alter_attributes,
901916
)
902917

@@ -905,13 +920,11 @@ def from_public(alter_consumer: ydb_topic_public_types.PublicAlterConsumer) -> A
905920
if not alter_consumer:
906921
return None
907922

908-
supported_codecs = alter_consumer.set_supported_codecs if alter_consumer.set_supported_codecs else []
909-
910923
return AlterConsumer(
911924
name=alter_consumer.name,
912925
set_important=alter_consumer.set_important,
913926
set_read_from=alter_consumer.set_read_from,
914-
set_supported_codecs=SupportedCodecs(codecs=supported_codecs),
927+
set_supported_codecs=SupportedCodecs.from_public(alter_consumer.set_supported_codecs),
915928
alter_attributes=alter_consumer.alter_attributes,
916929
)
917930

@@ -936,16 +949,9 @@ def to_proto(self) -> ydb_topic_pb2.PartitioningSettings:
936949

937950

938951
@dataclass
939-
class AlterPartitioningSettings(IToProto, IFromProto):
940-
set_min_active_partitions: int
941-
set_partition_count_limit: int
942-
943-
@staticmethod
944-
def from_proto(msg: ydb_topic_pb2.AlterPartitioningSettings) -> "AlterPartitioningSettings":
945-
return AlterPartitioningSettings(
946-
set_min_active_partitions=msg.set_min_active_partitions,
947-
set_partition_count_limit=msg.set_partition_count_limit,
948-
)
952+
class AlterPartitioningSettings(IToProto):
953+
set_min_active_partitions: Optional[int]
954+
set_partition_count_limit: Optional[int]
949955

950956
def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings:
951957
return ydb_topic_pb2.AlterPartitioningSettings(
@@ -1050,20 +1056,22 @@ class CreateTopicResult:
10501056
@dataclass
10511057
class AlterTopicRequest(IToProto, IFromPublic):
10521058
path: str
1053-
add_consumers: List["Consumer"]
1054-
alter_partitioning_settings: AlterPartitioningSettings
1055-
set_retention_period: datetime.timedelta
1056-
set_retention_storage_mb: int
1057-
set_supported_codecs: SupportedCodecs
1058-
set_partition_write_burst_bytes: typing.Optional[int]
1059-
set_partition_write_speed_bytes_per_second: typing.Optional[int]
1060-
alter_attributes: Dict[str, str]
1061-
alter_consumers: List[AlterConsumer]
1062-
drop_consumers: List[str]
1063-
set_metering_mode: "MeteringMode"
1059+
add_consumers: Optional[List["Consumer"]]
1060+
alter_partitioning_settings: Optional[AlterPartitioningSettings]
1061+
set_retention_period: Optional[datetime.timedelta]
1062+
set_retention_storage_mb: Optional[int]
1063+
set_supported_codecs: Optional[SupportedCodecs]
1064+
set_partition_write_burst_bytes: Optional[int]
1065+
set_partition_write_speed_bytes_per_second: Optional[int]
1066+
alter_attributes: Optional[Dict[str, str]]
1067+
alter_consumers: Optional[List[AlterConsumer]]
1068+
drop_consumers: Optional[List[str]]
1069+
set_metering_mode: Optional["MeteringMode"]
10641070

10651071
def to_proto(self) -> ydb_topic_pb2.AlterTopicRequest:
1066-
supported_codecs = self.set_supported_codecs.to_proto() if self.set_supported_codecs.codecs else None
1072+
supported_codecs = None
1073+
if self.set_supported_codecs is not None:
1074+
supported_codecs = self.set_supported_codecs.to_proto()
10671075

10681076
return ydb_topic_pb2.AlterTopicRequest(
10691077
path=self.path,
@@ -1098,8 +1106,6 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop
10981106

10991107
drop_consumers = req.drop_consumers if req.drop_consumers else []
11001108

1101-
supported_codecs = req.set_supported_codecs if req.set_supported_codecs else []
1102-
11031109
return AlterTopicRequest(
11041110
path=req.path,
11051111
alter_partitioning_settings=AlterPartitioningSettings(
@@ -1109,9 +1115,7 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop
11091115
add_consumers=add_consumers,
11101116
set_retention_period=req.set_retention_period,
11111117
set_retention_storage_mb=req.set_retention_storage_mb,
1112-
set_supported_codecs=SupportedCodecs(
1113-
codecs=supported_codecs,
1114-
),
1118+
set_supported_codecs=SupportedCodecs.from_public(req.set_supported_codecs),
11151119
set_partition_write_burst_bytes=req.set_partition_write_burst_bytes,
11161120
set_partition_write_speed_bytes_per_second=req.set_partition_write_speed_bytes_per_second,
11171121
alter_attributes=req.alter_attributes,
@@ -1121,11 +1125,6 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop
11211125
)
11221126

11231127

1124-
@dataclass
1125-
class AlterTopicResult:
1126-
pass
1127-
1128-
11291128
@dataclass
11301129
class DescribeTopicRequest:
11311130
path: str

ydb/_grpc/grpcwrapper/ydb_topic_public_types.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class PublicConsumer:
9393
@dataclass
9494
class PublicAlterConsumer:
9595
name: str
96-
set_important: bool = False
96+
set_important: Optional[bool] = None
9797
"""
9898
Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
9999
User should take care that such consumer never stalls, to prevent running out of disk space.
@@ -102,13 +102,13 @@ class PublicAlterConsumer:
102102
set_read_from: Optional[datetime.datetime] = None
103103
"All messages with smaller server written_at timestamp will be skipped."
104104

105-
set_supported_codecs: List[PublicCodec] = field(default_factory=lambda: list())
105+
set_supported_codecs: Optional[List[PublicCodec]] = None
106106
"""
107107
List of supported codecs by this consumer.
108108
supported_codecs on topic must be contained inside this list.
109109
"""
110110

111-
alter_attributes: Dict[str, str] = field(default_factory=lambda: dict())
111+
alter_attributes: Optional[Dict[str, str]] = None
112112
"Attributes of consumer"
113113

114114

ydb/_grpc/grpcwrapper/ydb_topic_test.py

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -60,27 +60,22 @@ def test_alter_topic_request_from_public_to_proto():
6060

6161
msg_dict = MessageToDict(request_proto, preserving_proto_field_name=True)
6262

63-
assert msg_dict["path"] == params["path"]
64-
assert len(msg_dict["add_consumers"]) == len(params["add_consumers"])
65-
assert len(msg_dict["alter_consumers"]) == len(params["alter_consumers"])
66-
assert len(msg_dict["drop_consumers"]) == len(params["drop_consumers"])
67-
assert msg_dict["alter_attributes"] == params["alter_attributes"]
68-
69-
assert (
70-
int(msg_dict["alter_partitioning_settings"]["set_min_active_partitions"]) == params["set_min_active_partitions"]
71-
)
72-
assert (
73-
int(msg_dict["alter_partitioning_settings"]["set_partition_count_limit"]) == params["set_partition_count_limit"]
74-
)
75-
76-
assert int(msg_dict["set_partition_write_burst_bytes"]) == params["set_partition_write_burst_bytes"]
77-
assert (
78-
int(msg_dict["set_partition_write_speed_bytes_per_second"])
79-
== params["set_partition_write_speed_bytes_per_second"]
80-
)
81-
assert msg_dict["set_retention_period"] == str(int(params["set_retention_period"].total_seconds())) + "s"
82-
assert int(msg_dict["set_retention_storage_mb"]) == params["set_retention_storage_mb"]
83-
84-
assert msg_dict["set_metering_mode"] == "METERING_MODE_RESERVED_CAPACITY"
63+
expected_dict = {
64+
"path": "topic_name",
65+
"alter_partitioning_settings": {"set_min_active_partitions": "2", "set_partition_count_limit": "4"},
66+
"set_retention_period": "2419200s",
67+
"set_retention_storage_mb": "4",
68+
"set_supported_codecs": {"codecs": [1, 2]},
69+
"set_partition_write_speed_bytes_per_second": "15",
70+
"set_partition_write_burst_bytes": "8",
71+
"alter_attributes": {"key": "value"},
72+
"add_consumers": [
73+
{"name": "new_consumer_1", "supported_codecs": {}},
74+
{"name": "new_consumer_2", "supported_codecs": {}},
75+
],
76+
"drop_consumers": ["redundant_consumer"],
77+
"alter_consumers": [{"name": "old_consumer_1"}, {"name": "old_consumer_2"},],
78+
"set_metering_mode": "METERING_MODE_RESERVED_CAPACITY"
79+
}
8580

86-
assert msg_dict["set_supported_codecs"]["codecs"] == params["set_supported_codecs"]
81+
assert msg_dict == expected_dict

0 commit comments

Comments
 (0)