Skip to content

Alter topic feature #448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions tests/topics/test_control_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ async def test_describe_topic(self, driver, topic_path: str, topic_consumer):

assert has_consumer

async def test_alter_not_existed_topic(self, driver, topic_path):
client = driver.topic_client

with pytest.raises(issues.SchemeError):
await client.alter_topic(topic_path + "-not-exist")

async def test_alter_existed_topic(self, driver, topic_path):
client = driver.topic_client

topic_before = await client.describe_topic(topic_path)

target_min_active_partitions = topic_before.min_active_partitions + 1
await client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions)

topic_after = await client.describe_topic(topic_path)
assert topic_after.min_active_partitions == target_min_active_partitions


class TestTopicClientControlPlane:
def test_create_topic(self, driver_sync, database):
Expand Down Expand Up @@ -72,3 +89,20 @@ def test_describe_topic(self, driver_sync, topic_path: str, topic_consumer):
break

assert has_consumer

def test_alter_not_existed_topic(self, driver_sync, topic_path):
client = driver_sync.topic_client

with pytest.raises(issues.SchemeError):
client.alter_topic(topic_path + "-not-exist")

def test_alter_existed_topic(self, driver_sync, topic_path):
client = driver_sync.topic_client

topic_before = client.describe_topic(topic_path)

target_min_active_partitions = topic_before.min_active_partitions + 1
client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions)

topic_after = client.describe_topic(topic_path)
assert topic_after.min_active_partitions == target_min_active_partitions
1 change: 1 addition & 0 deletions ydb/_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class TopicService(object):

CreateTopic = "CreateTopic"
DescribeTopic = "DescribeTopic"
AlterTopic = "AlterTopic"
DropTopic = "DropTopic"
StreamRead = "StreamRead"
StreamWrite = "StreamWrite"
Expand Down
136 changes: 134 additions & 2 deletions ydb/_grpc/grpcwrapper/ydb_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
)


class Codec(int, IToPublic):
class Codec(int, IToPublic, IFromPublic):
CODEC_UNSPECIFIED = 0
CODEC_RAW = 1
CODEC_GZIP = 2
Expand All @@ -47,9 +47,13 @@ def from_proto_iterable(codecs: typing.Iterable[int]) -> List["Codec"]:
def to_public(self) -> ydb_topic_public_types.PublicCodec:
return ydb_topic_public_types.PublicCodec(int(self))

@staticmethod
def from_public(codec: Union[ydb_topic_public_types.PublicCodec, int]) -> "Codec":
return Codec(int(codec))


@dataclass
class SupportedCodecs(IToProto, IFromProto, IToPublic):
class SupportedCodecs(IToProto, IFromProto, IToPublic, IFromPublic):
codecs: List[Codec]

def to_proto(self) -> ydb_topic_pb2.SupportedCodecs:
Expand All @@ -69,6 +73,15 @@ def from_proto(msg: Optional[ydb_topic_pb2.SupportedCodecs]) -> "SupportedCodecs
def to_public(self) -> List[ydb_topic_public_types.PublicCodec]:
return list(map(Codec.to_public, self.codecs))

@staticmethod
def from_public(
codecs: Optional[List[Union[ydb_topic_public_types.PublicCodec, int]]]
) -> Optional["SupportedCodecs"]:
if codecs is None:
return None

return SupportedCodecs(codecs=[Codec.from_public(codec) for codec in codecs])


@dataclass(order=True)
class OffsetsRange(IFromProto, IToProto):
Expand Down Expand Up @@ -883,6 +896,41 @@ def from_proto(
)


@dataclass
class AlterConsumer(IToProto, IFromPublic):
name: str
set_important: Optional[bool]
set_read_from: Optional[datetime.datetime]
set_supported_codecs: Optional[SupportedCodecs]
alter_attributes: Optional[Dict[str, str]]

def to_proto(self) -> ydb_topic_pb2.AlterConsumer:
supported_codecs = None
if self.set_supported_codecs is not None:
supported_codecs = self.set_supported_codecs.to_proto()

return ydb_topic_pb2.AlterConsumer(
name=self.name,
set_important=self.set_important,
set_read_from=proto_timestamp_from_datetime(self.set_read_from),
set_supported_codecs=supported_codecs,
alter_attributes=self.alter_attributes,
)

@staticmethod
def from_public(alter_consumer: ydb_topic_public_types.PublicAlterConsumer) -> AlterConsumer:
if not alter_consumer:
return None

return AlterConsumer(
name=alter_consumer.name,
set_important=alter_consumer.set_important,
set_read_from=alter_consumer.set_read_from,
set_supported_codecs=SupportedCodecs.from_public(alter_consumer.set_supported_codecs),
alter_attributes=alter_consumer.alter_attributes,
)


@dataclass
class PartitioningSettings(IToProto, IFromProto):
min_active_partitions: int
Expand All @@ -902,6 +950,18 @@ def to_proto(self) -> ydb_topic_pb2.PartitioningSettings:
)


@dataclass
class AlterPartitioningSettings(IToProto):
set_min_active_partitions: Optional[int]
set_partition_count_limit: Optional[int]

def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings:
return ydb_topic_pb2.AlterPartitioningSettings(
set_min_active_partitions=self.set_min_active_partitions,
set_partition_count_limit=self.set_partition_count_limit,
)


class MeteringMode(int, IFromProto, IFromPublic, IToPublic):
UNSPECIFIED = 0
RESERVED_CAPACITY = 1
Expand Down Expand Up @@ -995,6 +1055,78 @@ class CreateTopicResult:
pass


@dataclass
class AlterTopicRequest(IToProto, IFromPublic):
path: str
add_consumers: Optional[List["Consumer"]]
alter_partitioning_settings: Optional[AlterPartitioningSettings]
set_retention_period: Optional[datetime.timedelta]
set_retention_storage_mb: Optional[int]
set_supported_codecs: Optional[SupportedCodecs]
set_partition_write_burst_bytes: Optional[int]
set_partition_write_speed_bytes_per_second: Optional[int]
alter_attributes: Optional[Dict[str, str]]
alter_consumers: Optional[List[AlterConsumer]]
drop_consumers: Optional[List[str]]
set_metering_mode: Optional["MeteringMode"]

def to_proto(self) -> ydb_topic_pb2.AlterTopicRequest:
supported_codecs = None
if self.set_supported_codecs is not None:
supported_codecs = self.set_supported_codecs.to_proto()

return ydb_topic_pb2.AlterTopicRequest(
path=self.path,
add_consumers=[consumer.to_proto() for consumer in self.add_consumers],
alter_partitioning_settings=self.alter_partitioning_settings.to_proto(),
set_retention_period=proto_duration_from_timedelta(self.set_retention_period),
set_retention_storage_mb=self.set_retention_storage_mb,
set_supported_codecs=supported_codecs,
set_partition_write_burst_bytes=self.set_partition_write_burst_bytes,
set_partition_write_speed_bytes_per_second=self.set_partition_write_speed_bytes_per_second,
alter_attributes=self.alter_attributes,
alter_consumers=[consumer.to_proto() for consumer in self.alter_consumers],
drop_consumers=list(self.drop_consumers),
set_metering_mode=self.set_metering_mode,
)

@staticmethod
def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTopicRequest:
add_consumers = []
if req.add_consumers:
for consumer in req.add_consumers:
if isinstance(consumer, str):
consumer = ydb_topic_public_types.PublicConsumer(name=consumer)
add_consumers.append(Consumer.from_public(consumer))

alter_consumers = []
if req.alter_consumers:
for consumer in req.alter_consumers:
if isinstance(consumer, str):
consumer = ydb_topic_public_types.PublicAlterConsumer(name=consumer)
alter_consumers.append(AlterConsumer.from_public(consumer))

drop_consumers = req.drop_consumers if req.drop_consumers else []

return AlterTopicRequest(
path=req.path,
alter_partitioning_settings=AlterPartitioningSettings(
set_min_active_partitions=req.set_min_active_partitions,
set_partition_count_limit=req.set_partition_count_limit,
),
add_consumers=add_consumers,
set_retention_period=req.set_retention_period,
set_retention_storage_mb=req.set_retention_storage_mb,
set_supported_codecs=SupportedCodecs.from_public(req.set_supported_codecs),
set_partition_write_burst_bytes=req.set_partition_write_burst_bytes,
set_partition_write_speed_bytes_per_second=req.set_partition_write_speed_bytes_per_second,
alter_attributes=req.alter_attributes,
alter_consumers=alter_consumers,
drop_consumers=drop_consumers,
set_metering_mode=MeteringMode.from_public(req.set_metering_mode),
)


@dataclass
class DescribeTopicRequest:
path: str
Expand Down
39 changes: 39 additions & 0 deletions ydb/_grpc/grpcwrapper/ydb_topic_public_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,23 @@ class CreateTopicRequestParams:
metering_mode: Optional["PublicMeteringMode"]


@dataclass
class AlterTopicRequestParams:
path: str
set_min_active_partitions: Optional[int]
set_partition_count_limit: Optional[int]
add_consumers: Optional[List[Union["PublicConsumer", str]]]
alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]]
drop_consumers: Optional[List[str]]
alter_attributes: Optional[Dict[str, str]]
set_metering_mode: Optional["PublicMeteringMode"]
set_partition_write_speed_bytes_per_second: Optional[int]
set_partition_write_burst_bytes: Optional[int]
set_retention_period: Optional[datetime.timedelta]
set_retention_storage_mb: Optional[int]
set_supported_codecs: Optional[List[Union["PublicCodec", int]]]


class PublicCodec(int):
"""
Codec value may contain any int number.
Expand Down Expand Up @@ -73,6 +90,28 @@ class PublicConsumer:
"Attributes of consumer"


@dataclass
class PublicAlterConsumer:
name: str
set_important: Optional[bool] = None
"""
Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
User should take care that such consumer never stalls, to prevent running out of disk space.
"""

set_read_from: Optional[datetime.datetime] = None
"All messages with smaller server written_at timestamp will be skipped."

set_supported_codecs: Optional[List[PublicCodec]] = None
"""
List of supported codecs by this consumer.
supported_codecs on topic must be contained inside this list.
"""

alter_attributes: Optional[Dict[str, str]] = None
"Attributes of consumer"


@dataclass
class DropTopicRequestParams(IToProto):
path: str
Expand Down
65 changes: 65 additions & 0 deletions ydb/_grpc/grpcwrapper/ydb_topic_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
import datetime

from google.protobuf.json_format import MessageToDict

from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange
from .ydb_topic import AlterTopicRequest
from .ydb_topic_public_types import (
AlterTopicRequestParams,
PublicAlterConsumer,
PublicConsumer,
PublicCodec,
)


def test_offsets_range_intersected():
Expand All @@ -17,3 +28,57 @@ def test_offsets_range_intersected():
]:
assert OffsetsRange(test[0], test[1]).is_intersected_with(OffsetsRange(test[2], test[3]))
assert OffsetsRange(test[2], test[3]).is_intersected_with(OffsetsRange(test[0], test[1]))


def test_alter_topic_request_from_public_to_proto():
# Specify all fields with all possible input ways
params = {
"path": "topic_name",
"add_consumers": [
"new_consumer_1",
PublicConsumer("new_consumer_2"),
],
"alter_consumers": [
"old_consumer_1",
PublicAlterConsumer("old_consumer_2"),
],
"drop_consumers": ["redundant_consumer"],
"set_retention_period": datetime.timedelta(weeks=4),
"set_retention_storage_mb": 4,
"set_supported_codecs": [1, PublicCodec(2)],
"set_partition_write_burst_bytes": 8,
"set_partition_write_speed_bytes_per_second": 15,
"alter_attributes": {"key": "value"},
"set_metering_mode": 1,
"set_min_active_partitions": 2,
"set_partition_count_limit": 4,
}

params_public = AlterTopicRequestParams(**params)
request = AlterTopicRequest.from_public(params_public)
request_proto = request.to_proto()

msg_dict = MessageToDict(request_proto, preserving_proto_field_name=True)

expected_dict = {
"path": "topic_name",
"alter_partitioning_settings": {"set_min_active_partitions": "2", "set_partition_count_limit": "4"},
"set_retention_period": "2419200s",
"set_retention_storage_mb": "4",
"set_supported_codecs": {"codecs": [1, 2]},
"set_partition_write_speed_bytes_per_second": "15",
"set_partition_write_burst_bytes": "8",
"alter_attributes": {"key": "value"},
"add_consumers": [
{"name": "new_consumer_1", "supported_codecs": {}},
{"name": "new_consumer_2", "supported_codecs": {}},
],
"drop_consumers": ["redundant_consumer"],
"alter_consumers": [
{"name": "old_consumer_1"},
{"name": "old_consumer_2"},
],
"set_metering_mode": "METERING_MODE_RESERVED_CAPACITY",
}

assert msg_dict == expected_dict
Loading
Loading