Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 7f63d1e

Browse files
committedJul 2, 2024·
tests
1 parent a4588fb commit 7f63d1e

File tree

4 files changed

+105
-8
lines changed

4 files changed

+105
-8
lines changed
 

‎tests/topics/test_control_plane.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,23 @@ async def test_describe_topic(self, driver, topic_path: str, topic_consumer):
3939

4040
assert has_consumer
4141

42-
async def test_alter_topic(self, driver, topic_path):
42+
async def test_alter_not_existed_topic(self, driver, topic_path):
4343
client = driver.topic_client
4444

45-
await client.alter_topic(topic_path)
46-
4745
with pytest.raises(issues.SchemeError):
4846
await client.alter_topic(topic_path + "-not-exist")
4947

48+
async def test_alter_existed_topic(self, driver, topic_path):
49+
client = driver.topic_client
50+
51+
topic_before = await client.describe_topic(topic_path)
52+
53+
target_min_active_partitions = topic_before.min_active_partitions + 1
54+
await client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions)
55+
56+
topic_after = await client.describe_topic(topic_path)
57+
assert topic_after.min_active_partitions == target_min_active_partitions
58+
5059

5160
class TestTopicClientControlPlane:
5261
def test_create_topic(self, driver_sync, database):
@@ -81,10 +90,19 @@ def test_describe_topic(self, driver_sync, topic_path: str, topic_consumer):
8190

8291
assert has_consumer
8392

84-
def test_alter_topic(self, driver_sync, topic_path):
93+
def test_alter_not_existed_topic(self, driver_sync, topic_path):
8594
client = driver_sync.topic_client
8695

87-
client.alter_topic(topic_path)
88-
8996
with pytest.raises(issues.SchemeError):
9097
client.alter_topic(topic_path + "-not-exist")
98+
99+
def test_alter_existed_topic(self, driver_sync, topic_path):
100+
client = driver_sync.topic_client
101+
102+
topic_before = client.describe_topic(topic_path)
103+
104+
target_min_active_partitions = topic_before.min_active_partitions + 1
105+
client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions)
106+
107+
topic_after = client.describe_topic(topic_path)
108+
assert topic_after.min_active_partitions == target_min_active_partitions

‎ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,13 +1063,15 @@ class AlterTopicRequest(IToProto, IFromPublic):
10631063
set_metering_mode: "MeteringMode"
10641064

10651065
def to_proto(self) -> ydb_topic_pb2.AlterTopicRequest:
1066+
supported_codecs = self.set_supported_codecs.to_proto() if self.set_supported_codecs.codecs else None
1067+
10661068
return ydb_topic_pb2.AlterTopicRequest(
10671069
path=self.path,
10681070
add_consumers=[consumer.to_proto() for consumer in self.add_consumers],
10691071
alter_partitioning_settings=self.alter_partitioning_settings.to_proto(),
10701072
set_retention_period=proto_duration_from_timedelta(self.set_retention_period),
10711073
set_retention_storage_mb=self.set_retention_storage_mb,
1072-
set_supported_codecs=self.set_supported_codecs.to_proto(),
1074+
set_supported_codecs=supported_codecs,
10731075
set_partition_write_burst_bytes=self.set_partition_write_burst_bytes,
10741076
set_partition_write_speed_bytes_per_second=self.set_partition_write_speed_bytes_per_second,
10751077
alter_attributes=self.alter_attributes,

‎ydb/_grpc/grpcwrapper/ydb_topic_public_types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class AlterTopicRequestParams:
3737
set_partition_count_limit: Optional[int]
3838
add_consumers: Optional[List[Union["PublicConsumer", str]]]
3939
alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]]
40-
drop_consumers: Optional[List[str]] # TODO: clarify
40+
drop_consumers: Optional[List[str]]
4141
alter_attributes: Optional[Dict[str, str]]
4242
set_metering_mode: Optional["PublicMeteringMode"]
4343
set_partition_write_speed_bytes_per_second: Optional[int]

‎ydb/_grpc/grpcwrapper/ydb_topic_test.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,25 @@
1+
import datetime
2+
import typing
3+
4+
from google.protobuf.internal import containers
5+
from google.protobuf import duration_pb2
6+
from google.protobuf.json_format import MessageToDict
7+
18
from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange
9+
from .ydb_topic import AlterTopicRequest
10+
from .ydb_topic_public_types import (
11+
AlterTopicRequestParams,
12+
PublicAlterConsumer,
13+
PublicConsumer,
14+
PublicMeteringMode,
15+
PublicCodec,
16+
)
17+
18+
# Workaround for good IDE and universal for runtime
19+
if typing.TYPE_CHECKING:
20+
from ..v4.protos import ydb_topic_pb2
21+
else:
22+
from ..common.protos import ydb_topic_pb2
223

324

425
def test_offsets_range_intersected():
@@ -17,3 +38,59 @@ def test_offsets_range_intersected():
1738
]:
1839
assert OffsetsRange(test[0], test[1]).is_intersected_with(OffsetsRange(test[2], test[3]))
1940
assert OffsetsRange(test[2], test[3]).is_intersected_with(OffsetsRange(test[0], test[1]))
41+
42+
43+
def test_alter_topic_request_from_public_to_proto():
44+
# Specify all fields with all possible input ways
45+
params = {
46+
"path": "topic_name",
47+
"add_consumers": [
48+
"new_consumer_1",
49+
PublicConsumer("new_consumer_2"),
50+
],
51+
"alter_consumers": [
52+
"old_consumer_1",
53+
PublicAlterConsumer("old_consumer_2"),
54+
],
55+
"drop_consumers": ["redundant_consumer"],
56+
"set_retention_period": datetime.timedelta(weeks=4),
57+
"set_retention_storage_mb": 4,
58+
"set_supported_codecs": [1, PublicCodec(2)],
59+
"set_partition_write_burst_bytes": 8,
60+
"set_partition_write_speed_bytes_per_second": 15,
61+
"alter_attributes": {"key": "value"},
62+
"set_metering_mode": 1,
63+
"set_min_active_partitions": 2,
64+
"set_partition_count_limit": 4,
65+
}
66+
67+
params_public = AlterTopicRequestParams(**params)
68+
request = AlterTopicRequest.from_public(params_public)
69+
request_proto = request.to_proto()
70+
71+
msg_dict = MessageToDict(request_proto, preserving_proto_field_name=True)
72+
73+
assert msg_dict["path"] == params["path"]
74+
assert len(msg_dict["add_consumers"]) == len(params["add_consumers"])
75+
assert len(msg_dict["alter_consumers"]) == len(params["alter_consumers"])
76+
assert len(msg_dict["drop_consumers"]) == len(params["drop_consumers"])
77+
assert msg_dict["alter_attributes"] == params["alter_attributes"]
78+
79+
assert (
80+
int(msg_dict["alter_partitioning_settings"]["set_min_active_partitions"]) == params["set_min_active_partitions"]
81+
)
82+
assert (
83+
int(msg_dict["alter_partitioning_settings"]["set_partition_count_limit"]) == params["set_partition_count_limit"]
84+
)
85+
86+
assert int(msg_dict["set_partition_write_burst_bytes"]) == params["set_partition_write_burst_bytes"]
87+
assert (
88+
int(msg_dict["set_partition_write_speed_bytes_per_second"])
89+
== params["set_partition_write_speed_bytes_per_second"]
90+
)
91+
assert msg_dict["set_retention_period"] == str(int(params["set_retention_period"].total_seconds())) + "s"
92+
assert int(msg_dict["set_retention_storage_mb"]) == params["set_retention_storage_mb"]
93+
94+
assert msg_dict["set_metering_mode"] == "METERING_MODE_RESERVED_CAPACITY"
95+
96+
assert msg_dict["set_supported_codecs"]["codecs"] == params["set_supported_codecs"]

0 commit comments

Comments
 (0)
Please sign in to comment.