Skip to content

Commit d454b80

Browse files
committed
Alter topic feature
1 parent 36ef09b commit d454b80

File tree

4 files changed

+267
-0
lines changed

4 files changed

+267
-0
lines changed

ydb/_apis.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ class TopicService(object):
123123

124124
CreateTopic = "CreateTopic"
125125
DescribeTopic = "DescribeTopic"
126+
AlterTopic = "AlterTopic"
126127
DropTopic = "DropTopic"
127128
StreamRead = "StreamRead"
128129
StreamWrite = "StreamWrite"

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -883,6 +883,39 @@ def from_proto(
883883
)
884884

885885

886+
@dataclass
887+
class AlterConsumer(IToProto, IFromPublic):
888+
name: str
889+
set_important: bool
890+
set_read_from: datetime.datetime
891+
set_supported_codecs: SupportedCodecs
892+
alter_attributes: Dict[str, str]
893+
894+
def to_proto(self) -> ydb_topic_pb2.AlterConsumer:
895+
return ydb_topic_pb2.AlterConsumer(
896+
name=self.name,
897+
set_important=self.set_important,
898+
set_read_from=proto_timestamp_from_datetime(self.set_read_from),
899+
set_supported_codecs=self.set_supported_codecs.to_proto(),
900+
alter_attributes=self.alter_attributes,
901+
)
902+
903+
@staticmethod
904+
def from_public(alter_consumer: ydb_topic_public_types.PublicAlterConsumer) -> AlterConsumer:
905+
if not alter_consumer:
906+
return None
907+
908+
supported_codecs = alter_consumer.set_supported_codecs if alter_consumer.set_supported_codecs else []
909+
910+
return AlterConsumer(
911+
name=alter_consumer.name,
912+
set_important=alter_consumer.set_important,
913+
set_read_from=alter_consumer.set_read_from,
914+
set_supported_codecs=SupportedCodecs(codecs=supported_codecs),
915+
alter_attributes=alter_consumer.alter_attributes,
916+
)
917+
918+
886919
@dataclass
887920
class PartitioningSettings(IToProto, IFromProto):
888921
min_active_partitions: int
@@ -902,6 +935,25 @@ def to_proto(self) -> ydb_topic_pb2.PartitioningSettings:
902935
)
903936

904937

938+
@dataclass
939+
class AlterPartitioningSettings(IToProto, IFromProto):
940+
set_min_active_partitions: int
941+
set_partition_count_limit: int
942+
943+
@staticmethod
944+
def from_proto(msg: ydb_topic_pb2.AlterPartitioningSettings) -> "AlterPartitioningSettings":
945+
return AlterPartitioningSettings(
946+
set_min_active_partitions=msg.set_min_active_partitions,
947+
set_partition_count_limit=msg.set_partition_count_limit,
948+
)
949+
950+
def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings:
951+
return ydb_topic_pb2.AlterPartitioningSettings(
952+
set_min_active_partitions=self.set_min_active_partitions,
953+
set_partition_count_limit=self.set_partition_count_limit,
954+
)
955+
956+
905957
class MeteringMode(int, IFromProto, IFromPublic, IToPublic):
906958
UNSPECIFIED = 0
907959
RESERVED_CAPACITY = 1
@@ -995,6 +1047,83 @@ class CreateTopicResult:
9951047
pass
9961048

9971049

1050+
@dataclass
1051+
class AlterTopicRequest(IToProto, IFromPublic):
1052+
path: str
1053+
add_consumers: List["Consumer"]
1054+
alter_partitioning_settings: AlterPartitioningSettings
1055+
set_retention_period: datetime.timedelta
1056+
set_retention_storage_mb: int
1057+
set_supported_codecs: SupportedCodecs
1058+
set_partition_write_burst_bytes: typing.Optional[int]
1059+
set_partition_write_speed_bytes_per_second: typing.Optional[int]
1060+
alter_attributes: Dict[str, str]
1061+
alter_consumers: List[AlterConsumer]
1062+
drop_consumers: List[str]
1063+
set_metering_mode: "MeteringMode"
1064+
1065+
def to_proto(self) -> ydb_topic_pb2.AlterTopicRequest:
1066+
return ydb_topic_pb2.AlterTopicRequest(
1067+
path=self.path,
1068+
add_consumers=[consumer.to_proto() for consumer in self.add_consumers],
1069+
alter_partitioning_settings=self.alter_partitioning_settings.to_proto(),
1070+
set_retention_period=proto_duration_from_timedelta(self.set_retention_period),
1071+
set_retention_storage_mb=self.set_retention_storage_mb,
1072+
set_supported_codecs=self.set_supported_codecs.to_proto(),
1073+
set_partition_write_burst_bytes=self.set_partition_write_burst_bytes,
1074+
set_partition_write_speed_bytes_per_second=self.set_partition_write_speed_bytes_per_second,
1075+
alter_attributes=self.alter_attributes,
1076+
alter_consumers=[consumer.to_proto() for consumer in self.alter_consumers],
1077+
drop_consumers=list(self.drop_consumers),
1078+
set_metering_mode=self.set_metering_mode,
1079+
)
1080+
1081+
@staticmethod
1082+
def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTopicRequest:
1083+
add_consumers = []
1084+
if req.add_consumers:
1085+
for consumer in req.add_consumers:
1086+
if isinstance(consumer, str):
1087+
consumer = ydb_topic_public_types.PublicConsumer(name=consumer)
1088+
add_consumers.append(Consumer.from_public(consumer))
1089+
1090+
alter_consumers = []
1091+
if req.alter_consumers:
1092+
for consumer in req.alter_consumers:
1093+
if isinstance(consumer, str):
1094+
consumer = ydb_topic_public_types.PublicAlterConsumer(name=consumer)
1095+
alter_consumers.append(AlterConsumer.from_public(consumer))
1096+
1097+
drop_consumers = req.drop_consumers if req.drop_consumers else []
1098+
1099+
supported_codecs = req.set_supported_codecs if req.set_supported_codecs else []
1100+
1101+
return AlterTopicRequest(
1102+
path=req.path,
1103+
alter_partitioning_settings=AlterPartitioningSettings(
1104+
set_min_active_partitions=req.set_min_active_partitions,
1105+
set_partition_count_limit=req.set_partition_count_limit,
1106+
),
1107+
add_consumers=add_consumers,
1108+
set_retention_period=req.set_retention_period,
1109+
set_retention_storage_mb=req.set_retention_storage_mb,
1110+
set_supported_codecs=SupportedCodecs(
1111+
codecs=supported_codecs,
1112+
),
1113+
set_partition_write_burst_bytes=req.set_partition_write_burst_bytes,
1114+
set_partition_write_speed_bytes_per_second=req.set_partition_write_speed_bytes_per_second,
1115+
alter_attributes=req.alter_attributes,
1116+
alter_consumers=alter_consumers,
1117+
drop_consumers=drop_consumers,
1118+
set_metering_mode=MeteringMode.from_public(req.set_metering_mode),
1119+
)
1120+
1121+
1122+
@dataclass
1123+
class AlterTopicResult:
1124+
pass
1125+
1126+
9981127
@dataclass
9991128
class DescribeTopicRequest:
10001129
path: str

ydb/_grpc/grpcwrapper/ydb_topic_public_types.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,23 @@ class CreateTopicRequestParams:
3030
metering_mode: Optional["PublicMeteringMode"]
3131

3232

33+
@dataclass
34+
class AlterTopicRequestParams:
35+
path: str
36+
set_min_active_partitions: Optional[int]
37+
set_partition_count_limit: Optional[int]
38+
add_consumers: Optional[List[Union["PublicConsumer", str]]]
39+
alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]]
40+
drop_consumers: Optional[List[str]] # TODO: clarify
41+
alter_attributes: Optional[Dict[str, str]]
42+
set_metering_mode: Optional["PublicMeteringMode"]
43+
set_partition_write_speed_bytes_per_second: Optional[int]
44+
set_partition_write_burst_bytes: Optional[int]
45+
set_retention_period: Optional[datetime.timedelta]
46+
set_retention_storage_mb: Optional[int]
47+
set_supported_codecs: Optional[List[Union["PublicCodec", int]]]
48+
49+
3350
class PublicCodec(int):
3451
"""
3552
Codec value may contain any int number.
@@ -73,6 +90,28 @@ class PublicConsumer:
7390
"Attributes of consumer"
7491

7592

93+
@dataclass
94+
class PublicAlterConsumer:
95+
name: str
96+
set_important: bool = False
97+
"""
98+
Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
99+
User should take care that such consumer never stalls, to prevent running out of disk space.
100+
"""
101+
102+
set_read_from: Optional[datetime.datetime] = None
103+
"All messages with smaller server written_at timestamp will be skipped."
104+
105+
set_supported_codecs: List[PublicCodec] = field(default_factory=lambda: list())
106+
"""
107+
List of supported codecs by this consumer.
108+
supported_codecs on topic must be contained inside this list.
109+
"""
110+
111+
alter_attributes: Dict[str, str] = field(default_factory=lambda: dict())
112+
"Attributes of consumer"
113+
114+
76115
@dataclass
77116
class DropTopicRequestParams(IToProto):
78117
path: str

ydb/topic.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"TopicClientSettings",
77
"TopicCodec",
88
"TopicConsumer",
9+
"TopicAlterConsumer",
910
"TopicDescription",
1011
"TopicError",
1112
"TopicMeteringMode",
@@ -77,6 +78,7 @@
7778
PublicPartitionStats as TopicPartitionStats,
7879
PublicCodec as TopicCodec,
7980
PublicConsumer as TopicConsumer,
81+
PublicAlterConsumer as TopicAlterConsumer,
8082
PublicMeteringMode as TopicMeteringMode,
8183
)
8284

@@ -145,6 +147,53 @@ async def create_topic(
145147
_wrap_operation,
146148
)
147149

150+
async def alter_topic(
151+
self,
152+
path: str,
153+
set_min_active_partitions: Optional[int] = None,
154+
set_partition_count_limit: Optional[int] = None,
155+
add_consumers: Optional[List[Union[TopicConsumer, str]]] = None,
156+
alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None,
157+
drop_consumers: Optional[List[str]] = None,
158+
alter_attributes: Optional[Dict[str, str]] = None,
159+
set_metering_mode: Optional[TopicMeteringMode] = None,
160+
set_partition_write_speed_bytes_per_second: Optional[int] = None,
161+
set_partition_write_burst_bytes: Optional[int] = None,
162+
set_retention_period: Optional[datetime.timedelta] = None,
163+
set_retention_storage_mb: Optional[int] = None,
164+
set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None,
165+
):
166+
"""
167+
alter topic command
168+
169+
:param path: full path to topic
170+
:param set_min_active_partitions: Minimum partition count auto merge would stop working at.
171+
:param set_partition_count_limit: Limit for total partition count, including active (open for write)
172+
and read-only partitions.
173+
:param add_consumers: List of consumers for this topic to add
174+
:param alter_consumers: List of consumers for this topic to alter
175+
:param drop_consumers: List of consumer names for this topic to drop
176+
:param alter_attributes: User and server attributes of topic.
177+
Server attributes starts from "_" and will be validated by server.
178+
:param set_metering_mode: Metering mode for the topic in a serverless database
179+
:param set_partition_write_speed_bytes_per_second: Partition write speed in bytes per second
180+
:param set_partition_write_burst_bytes: Burst size for write in partition, in bytes
181+
:param set_retention_period: How long data in partition should be stored
182+
:param set_retention_storage_mb: How much data in partition should be stored
183+
:param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden.
184+
Empty list mean disable codec compatibility checks for the topic.
185+
"""
186+
args = locals().copy()
187+
del args["self"]
188+
req = _ydb_topic_public_types.AlterTopicRequestParams(**args)
189+
req = _ydb_topic.AlterTopicRequest.from_public(req)
190+
await self._driver(
191+
req.to_proto(),
192+
_apis.TopicService.Stub,
193+
_apis.TopicService.AlterTopic,
194+
_wrap_operation,
195+
)
196+
148197
async def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription:
149198
args = locals().copy()
150199
del args["self"]
@@ -297,6 +346,55 @@ def create_topic(
297346
_wrap_operation,
298347
)
299348

349+
def alter_topic(
350+
self,
351+
path: str,
352+
set_min_active_partitions: Optional[int] = None,
353+
set_partition_count_limit: Optional[int] = None,
354+
add_consumers: Optional[List[Union[TopicConsumer, str]]] = None,
355+
alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None,
356+
drop_consumers: Optional[List[str]] = None,
357+
alter_attributes: Optional[Dict[str, str]] = None,
358+
set_metering_mode: Optional[TopicMeteringMode] = None,
359+
set_partition_write_speed_bytes_per_second: Optional[int] = None,
360+
set_partition_write_burst_bytes: Optional[int] = None,
361+
set_retention_period: Optional[datetime.timedelta] = None,
362+
set_retention_storage_mb: Optional[int] = None,
363+
set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None,
364+
):
365+
"""
366+
alter topic command
367+
368+
:param path: full path to topic
369+
:param set_min_active_partitions: Minimum partition count auto merge would stop working at.
370+
:param set_partition_count_limit: Limit for total partition count, including active (open for write)
371+
and read-only partitions.
372+
:param add_consumers: List of consumers for this topic to add
373+
:param alter_consumers: List of consumers for this topic to alter
374+
:param drop_consumers: List of consumer names for this topic to drop
375+
:param alter_attributes: User and server attributes of topic.
376+
Server attributes starts from "_" and will be validated by server.
377+
:param set_metering_mode: Metering mode for the topic in a serverless database
378+
:param set_partition_write_speed_bytes_per_second: Partition write speed in bytes per second
379+
:param set_partition_write_burst_bytes: Burst size for write in partition, in bytes
380+
:param set_retention_period: How long data in partition should be stored
381+
:param set_retention_storage_mb: How much data in partition should be stored
382+
:param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden.
383+
Empty list mean disable codec compatibility checks for the topic.
384+
"""
385+
args = locals().copy()
386+
del args["self"]
387+
self._check_closed()
388+
389+
req = _ydb_topic_public_types.AlterTopicRequestParams(**args)
390+
req = _ydb_topic.AlterTopicRequest.from_public(req)
391+
self._driver(
392+
req.to_proto(),
393+
_apis.TopicService.Stub,
394+
_apis.TopicService.AlterTopic,
395+
_wrap_operation,
396+
)
397+
300398
def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription:
301399
args = locals().copy()
302400
del args["self"]

0 commit comments

Comments
 (0)