1
+ import datetime
2
+ import typing
3
+
4
+ from google .protobuf .internal import containers
5
+ from google .protobuf import duration_pb2
6
+ from google .protobuf .json_format import MessageToDict
7
+
1
8
from ydb ._grpc .grpcwrapper .ydb_topic import OffsetsRange
9
+ from .ydb_topic import AlterTopicRequest
10
+ from .ydb_topic_public_types import (
11
+ AlterTopicRequestParams ,
12
+ PublicAlterConsumer ,
13
+ PublicConsumer ,
14
+ PublicMeteringMode ,
15
+ PublicCodec
16
+ )
17
+
18
+ # Workaround for good IDE and universal for runtime
19
+ if typing .TYPE_CHECKING :
20
+ from ..v4 .protos import ydb_topic_pb2
21
+ else :
22
+ from ..common .protos import ydb_topic_pb2
2
23
3
24
4
25
def test_offsets_range_intersected ():
@@ -17,3 +38,55 @@ def test_offsets_range_intersected():
17
38
]:
18
39
assert OffsetsRange (test [0 ], test [1 ]).is_intersected_with (OffsetsRange (test [2 ], test [3 ]))
19
40
assert OffsetsRange (test [2 ], test [3 ]).is_intersected_with (OffsetsRange (test [0 ], test [1 ]))
41
+
42
+
43
+ def test_alter_topic_request_from_public_to_proto ():
44
+ # Specify all fields with all possible input ways
45
+ params = {
46
+ "path" : "topic_name" ,
47
+ "add_consumers" : [
48
+ "new_consumer_1" ,
49
+ PublicConsumer ("new_consumer_2" ),
50
+ ],
51
+ "alter_consumers" : [
52
+ "old_consumer_1" ,
53
+ PublicAlterConsumer ("old_consumer_2" ),
54
+ ],
55
+ "drop_consumers" : ["redundant_consumer" ],
56
+ "set_retention_period" : datetime .timedelta (weeks = 4 ),
57
+ "set_retention_storage_mb" : 4 ,
58
+ "set_supported_codecs" : [
59
+ 1 ,
60
+ PublicCodec (2 )
61
+ ],
62
+ "set_partition_write_burst_bytes" : 8 ,
63
+ "set_partition_write_speed_bytes_per_second" : 15 ,
64
+ "alter_attributes" : {'key' : 'value' },
65
+ "set_metering_mode" : 1 ,
66
+ "set_min_active_partitions" : 2 ,
67
+ "set_partition_count_limit" : 4 ,
68
+ }
69
+
70
+ params_public = AlterTopicRequestParams (** params )
71
+ request = AlterTopicRequest .from_public (params_public )
72
+ request_proto = request .to_proto ()
73
+
74
+ msg_dict = MessageToDict (request_proto , preserving_proto_field_name = True )
75
+
76
+ assert msg_dict ['path' ] == params ['path' ]
77
+ assert len (msg_dict ['add_consumers' ]) == len (params ['add_consumers' ])
78
+ assert len (msg_dict ['alter_consumers' ]) == len (params ['alter_consumers' ])
79
+ assert len (msg_dict ['drop_consumers' ]) == len (params ['drop_consumers' ])
80
+ assert msg_dict ['alter_attributes' ] == params ['alter_attributes' ]
81
+
82
+ assert int (msg_dict ["alter_partitioning_settings" ]["set_min_active_partitions" ]) == params ['set_min_active_partitions' ]
83
+ assert int (msg_dict ["alter_partitioning_settings" ]["set_partition_count_limit" ]) == params ['set_partition_count_limit' ]
84
+
85
+ assert int (msg_dict ["set_partition_write_burst_bytes" ]) == params ['set_partition_write_burst_bytes' ]
86
+ assert int (msg_dict ["set_partition_write_speed_bytes_per_second" ]) == params ['set_partition_write_speed_bytes_per_second' ]
87
+ assert msg_dict ["set_retention_period" ] == str (int (params ['set_retention_period' ].total_seconds ())) + "s"
88
+ assert int (msg_dict ["set_retention_storage_mb" ]) == params ['set_retention_storage_mb' ]
89
+
90
+ assert msg_dict ["set_metering_mode" ] == "METERING_MODE_RESERVED_CAPACITY"
91
+
92
+ assert msg_dict ["set_supported_codecs" ]["codecs" ] == params ['set_supported_codecs' ]
0 commit comments