diff --git a/tests/topics/test_control_plane.py b/tests/topics/test_control_plane.py
index 2446ddcf..25258b61 100644
--- a/tests/topics/test_control_plane.py
+++ b/tests/topics/test_control_plane.py
@@ -39,6 +39,23 @@ async def test_describe_topic(self, driver, topic_path: str, topic_consumer):
 
         assert has_consumer
 
+    async def test_alter_not_existed_topic(self, driver, topic_path):
+        client = driver.topic_client
+
+        with pytest.raises(issues.SchemeError):
+            await client.alter_topic(topic_path + "-not-exist")
+
+    async def test_alter_existed_topic(self, driver, topic_path):
+        client = driver.topic_client
+
+        topic_before = await client.describe_topic(topic_path)
+
+        target_min_active_partitions = topic_before.min_active_partitions + 1
+        await client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions)
+
+        topic_after = await client.describe_topic(topic_path)
+        assert topic_after.min_active_partitions == target_min_active_partitions
+
 
 class TestTopicClientControlPlane:
     def test_create_topic(self, driver_sync, database):
@@ -72,3 +89,20 @@ def test_describe_topic(self, driver_sync, topic_path: str, topic_consumer):
                 break
 
         assert has_consumer
+
+    def test_alter_not_existed_topic(self, driver_sync, topic_path):
+        client = driver_sync.topic_client
+
+        with pytest.raises(issues.SchemeError):
+            client.alter_topic(topic_path + "-not-exist")
+
+    def test_alter_existed_topic(self, driver_sync, topic_path):
+        client = driver_sync.topic_client
+
+        topic_before = client.describe_topic(topic_path)
+
+        target_min_active_partitions = topic_before.min_active_partitions + 1
+        client.alter_topic(topic_path, set_min_active_partitions=target_min_active_partitions)
+
+        topic_after = client.describe_topic(topic_path)
+        assert topic_after.min_active_partitions == target_min_active_partitions
diff --git a/ydb/_apis.py b/ydb/_apis.py
index 9a241e5c..bd455838 100644
--- a/ydb/_apis.py
+++ b/ydb/_apis.py
@@ -123,6 +123,7 @@ class TopicService(object):
 
     CreateTopic = "CreateTopic"
     DescribeTopic = "DescribeTopic"
+    AlterTopic = "AlterTopic"
     DropTopic = "DropTopic"
     StreamRead = "StreamRead"
     StreamWrite = "StreamWrite"
diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py
index 5b5e294a..c1789b6c 100644
--- a/ydb/_grpc/grpcwrapper/ydb_topic.py
+++ b/ydb/_grpc/grpcwrapper/ydb_topic.py
@@ -33,7 +33,7 @@
 )
 
 
-class Codec(int, IToPublic):
+class Codec(int, IToPublic, IFromPublic):
     CODEC_UNSPECIFIED = 0
     CODEC_RAW = 1
     CODEC_GZIP = 2
@@ -47,9 +47,13 @@ def from_proto_iterable(codecs: typing.Iterable[int]) -> List["Codec"]:
     def to_public(self) -> ydb_topic_public_types.PublicCodec:
         return ydb_topic_public_types.PublicCodec(int(self))
 
+    @staticmethod
+    def from_public(codec: Union[ydb_topic_public_types.PublicCodec, int]) -> "Codec":
+        return Codec(int(codec))
+
 
 @dataclass
-class SupportedCodecs(IToProto, IFromProto, IToPublic):
+class SupportedCodecs(IToProto, IFromProto, IToPublic, IFromPublic):
     codecs: List[Codec]
 
     def to_proto(self) -> ydb_topic_pb2.SupportedCodecs:
@@ -69,6 +73,15 @@ def from_proto(msg: Optional[ydb_topic_pb2.SupportedCodecs]) -> "SupportedCodecs
     def to_public(self) -> List[ydb_topic_public_types.PublicCodec]:
         return list(map(Codec.to_public, self.codecs))
 
+    @staticmethod
+    def from_public(
+        codecs: Optional[List[Union[ydb_topic_public_types.PublicCodec, int]]]
+    ) -> Optional["SupportedCodecs"]:
+        if codecs is None:
+            return None
+
+        return SupportedCodecs(codecs=[Codec.from_public(codec) for codec in codecs])
+
 
 @dataclass(order=True)
 class OffsetsRange(IFromProto, IToProto):
@@ -883,6 +896,41 @@ def from_proto(
             )
 
 
+@dataclass
+class AlterConsumer(IToProto, IFromPublic):
+    name: str
+    set_important: Optional[bool]
+    set_read_from: Optional[datetime.datetime]
+    set_supported_codecs: Optional[SupportedCodecs]
+    alter_attributes: Optional[Dict[str, str]]
+
+    def to_proto(self) -> ydb_topic_pb2.AlterConsumer:
+        supported_codecs = None
+        if self.set_supported_codecs is not None:
+            supported_codecs = self.set_supported_codecs.to_proto()
+
+        return ydb_topic_pb2.AlterConsumer(
+            name=self.name,
+            set_important=self.set_important,
+            set_read_from=proto_timestamp_from_datetime(self.set_read_from),
+            set_supported_codecs=supported_codecs,
+            alter_attributes=self.alter_attributes,
+        )
+
+    @staticmethod
+    def from_public(alter_consumer: ydb_topic_public_types.PublicAlterConsumer) -> AlterConsumer:
+        if not alter_consumer:
+            return None
+
+        return AlterConsumer(
+            name=alter_consumer.name,
+            set_important=alter_consumer.set_important,
+            set_read_from=alter_consumer.set_read_from,
+            set_supported_codecs=SupportedCodecs.from_public(alter_consumer.set_supported_codecs),
+            alter_attributes=alter_consumer.alter_attributes,
+        )
+
+
 @dataclass
 class PartitioningSettings(IToProto, IFromProto):
     min_active_partitions: int
@@ -902,6 +950,18 @@ def to_proto(self) -> ydb_topic_pb2.PartitioningSettings:
         )
 
 
+@dataclass
+class AlterPartitioningSettings(IToProto):
+    set_min_active_partitions: Optional[int]
+    set_partition_count_limit: Optional[int]
+
+    def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings:
+        return ydb_topic_pb2.AlterPartitioningSettings(
+            set_min_active_partitions=self.set_min_active_partitions,
+            set_partition_count_limit=self.set_partition_count_limit,
+        )
+
+
 class MeteringMode(int, IFromProto, IFromPublic, IToPublic):
     UNSPECIFIED = 0
     RESERVED_CAPACITY = 1
@@ -995,6 +1055,78 @@ class CreateTopicResult:
     pass
 
 
+@dataclass
+class AlterTopicRequest(IToProto, IFromPublic):
+    path: str
+    add_consumers: Optional[List["Consumer"]]
+    alter_partitioning_settings: Optional[AlterPartitioningSettings]
+    set_retention_period: Optional[datetime.timedelta]
+    set_retention_storage_mb: Optional[int]
+    set_supported_codecs: Optional[SupportedCodecs]
+    set_partition_write_burst_bytes: Optional[int]
+    set_partition_write_speed_bytes_per_second: Optional[int]
+    alter_attributes: Optional[Dict[str, str]]
+    alter_consumers: Optional[List[AlterConsumer]]
+    drop_consumers: Optional[List[str]]
+    set_metering_mode: Optional["MeteringMode"]
+
+    def to_proto(self) -> ydb_topic_pb2.AlterTopicRequest:
+        supported_codecs = None
+        if self.set_supported_codecs is not None:
+            supported_codecs = self.set_supported_codecs.to_proto()
+
+        return ydb_topic_pb2.AlterTopicRequest(
+            path=self.path,
+            add_consumers=[consumer.to_proto() for consumer in self.add_consumers],
+            alter_partitioning_settings=self.alter_partitioning_settings.to_proto(),
+            set_retention_period=proto_duration_from_timedelta(self.set_retention_period),
+            set_retention_storage_mb=self.set_retention_storage_mb,
+            set_supported_codecs=supported_codecs,
+            set_partition_write_burst_bytes=self.set_partition_write_burst_bytes,
+            set_partition_write_speed_bytes_per_second=self.set_partition_write_speed_bytes_per_second,
+            alter_attributes=self.alter_attributes,
+            alter_consumers=[consumer.to_proto() for consumer in self.alter_consumers],
+            drop_consumers=list(self.drop_consumers),
+            set_metering_mode=self.set_metering_mode,
+        )
+
+    @staticmethod
+    def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTopicRequest:
+        add_consumers = []
+        if req.add_consumers:
+            for consumer in req.add_consumers:
+                if isinstance(consumer, str):
+                    consumer = ydb_topic_public_types.PublicConsumer(name=consumer)
+                add_consumers.append(Consumer.from_public(consumer))
+
+        alter_consumers = []
+        if req.alter_consumers:
+            for consumer in req.alter_consumers:
+                if isinstance(consumer, str):
+                    consumer = ydb_topic_public_types.PublicAlterConsumer(name=consumer)
+                alter_consumers.append(AlterConsumer.from_public(consumer))
+
+        drop_consumers = req.drop_consumers if req.drop_consumers else []
+
+        return AlterTopicRequest(
+            path=req.path,
+            alter_partitioning_settings=AlterPartitioningSettings(
+                set_min_active_partitions=req.set_min_active_partitions,
+                set_partition_count_limit=req.set_partition_count_limit,
+            ),
+            add_consumers=add_consumers,
+            set_retention_period=req.set_retention_period,
+            set_retention_storage_mb=req.set_retention_storage_mb,
+            set_supported_codecs=SupportedCodecs.from_public(req.set_supported_codecs),
+            set_partition_write_burst_bytes=req.set_partition_write_burst_bytes,
+            set_partition_write_speed_bytes_per_second=req.set_partition_write_speed_bytes_per_second,
+            alter_attributes=req.alter_attributes,
+            alter_consumers=alter_consumers,
+            drop_consumers=drop_consumers,
+            set_metering_mode=MeteringMode.from_public(req.set_metering_mode),
+        )
+
+
 @dataclass
 class DescribeTopicRequest:
     path: str
diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py
index df280a8b..d1987546 100644
--- a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py
+++ b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py
@@ -30,6 +30,23 @@ class CreateTopicRequestParams:
     metering_mode: Optional["PublicMeteringMode"]
 
 
+@dataclass
+class AlterTopicRequestParams:
+    path: str
+    set_min_active_partitions: Optional[int]
+    set_partition_count_limit: Optional[int]
+    add_consumers: Optional[List[Union["PublicConsumer", str]]]
+    alter_consumers: Optional[List[Union["PublicAlterConsumer", str]]]
+    drop_consumers: Optional[List[str]]
+    alter_attributes: Optional[Dict[str, str]]
+    set_metering_mode: Optional["PublicMeteringMode"]
+    set_partition_write_speed_bytes_per_second: Optional[int]
+    set_partition_write_burst_bytes: Optional[int]
+    set_retention_period: Optional[datetime.timedelta]
+    set_retention_storage_mb: Optional[int]
+    set_supported_codecs: Optional[List[Union["PublicCodec", int]]]
+
+
 class PublicCodec(int):
     """
     Codec value may contain any int number.
@@ -73,6 +90,28 @@ class PublicConsumer:
     "Attributes of consumer"
 
 
+@dataclass
+class PublicAlterConsumer:
+    name: str
+    set_important: Optional[bool] = None
+    """
+    Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
+    User should take care that such consumer never stalls, to prevent running out of disk space.
+    """
+
+    set_read_from: Optional[datetime.datetime] = None
+    "All messages with smaller server written_at timestamp will be skipped."
+
+    set_supported_codecs: Optional[List[PublicCodec]] = None
+    """
+    List of supported codecs by this consumer.
+    supported_codecs on topic must be contained inside this list.
+    """
+
+    alter_attributes: Optional[Dict[str, str]] = None
+    "Attributes of consumer"
+
+
 @dataclass
 class DropTopicRequestParams(IToProto):
     path: str
diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_test.py b/ydb/_grpc/grpcwrapper/ydb_topic_test.py
index ff29834a..53256ac1 100644
--- a/ydb/_grpc/grpcwrapper/ydb_topic_test.py
+++ b/ydb/_grpc/grpcwrapper/ydb_topic_test.py
@@ -1,4 +1,15 @@
+import datetime
+
+from google.protobuf.json_format import MessageToDict
+
 from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange
+from .ydb_topic import AlterTopicRequest
+from .ydb_topic_public_types import (
+    AlterTopicRequestParams,
+    PublicAlterConsumer,
+    PublicConsumer,
+    PublicCodec,
+)
 
 
 def test_offsets_range_intersected():
@@ -17,3 +28,57 @@ def test_offsets_range_intersected():
     ]:
         assert OffsetsRange(test[0], test[1]).is_intersected_with(OffsetsRange(test[2], test[3]))
         assert OffsetsRange(test[2], test[3]).is_intersected_with(OffsetsRange(test[0], test[1]))
+
+
+def test_alter_topic_request_from_public_to_proto():
+    # Specify all fields with all possible input ways
+    params = {
+        "path": "topic_name",
+        "add_consumers": [
+            "new_consumer_1",
+            PublicConsumer("new_consumer_2"),
+        ],
+        "alter_consumers": [
+            "old_consumer_1",
+            PublicAlterConsumer("old_consumer_2"),
+        ],
+        "drop_consumers": ["redundant_consumer"],
+        "set_retention_period": datetime.timedelta(weeks=4),
+        "set_retention_storage_mb": 4,
+        "set_supported_codecs": [1, PublicCodec(2)],
+        "set_partition_write_burst_bytes": 8,
+        "set_partition_write_speed_bytes_per_second": 15,
+        "alter_attributes": {"key": "value"},
+        "set_metering_mode": 1,
+        "set_min_active_partitions": 2,
+        "set_partition_count_limit": 4,
+    }
+
+    params_public = AlterTopicRequestParams(**params)
+    request = AlterTopicRequest.from_public(params_public)
+    request_proto = request.to_proto()
+
+    msg_dict = MessageToDict(request_proto, preserving_proto_field_name=True)
+
+    expected_dict = {
+        "path": "topic_name",
+        "alter_partitioning_settings": {"set_min_active_partitions": "2", "set_partition_count_limit": "4"},
+        "set_retention_period": "2419200s",
+        "set_retention_storage_mb": "4",
+        "set_supported_codecs": {"codecs": [1, 2]},
+        "set_partition_write_speed_bytes_per_second": "15",
+        "set_partition_write_burst_bytes": "8",
+        "alter_attributes": {"key": "value"},
+        "add_consumers": [
+            {"name": "new_consumer_1", "supported_codecs": {}},
+            {"name": "new_consumer_2", "supported_codecs": {}},
+        ],
+        "drop_consumers": ["redundant_consumer"],
+        "alter_consumers": [
+            {"name": "old_consumer_1"},
+            {"name": "old_consumer_2"},
+        ],
+        "set_metering_mode": "METERING_MODE_RESERVED_CAPACITY",
+    }
+
+    assert msg_dict == expected_dict
diff --git a/ydb/topic.py b/ydb/topic.py
index 948bcff4..f0b872e2 100644
--- a/ydb/topic.py
+++ b/ydb/topic.py
@@ -6,6 +6,7 @@
     "TopicClientSettings",
     "TopicCodec",
     "TopicConsumer",
+    "TopicAlterConsumer",
     "TopicDescription",
     "TopicError",
     "TopicMeteringMode",
@@ -77,6 +78,7 @@
     PublicPartitionStats as TopicPartitionStats,
     PublicCodec as TopicCodec,
     PublicConsumer as TopicConsumer,
+    PublicAlterConsumer as TopicAlterConsumer,
     PublicMeteringMode as TopicMeteringMode,
 )
 
@@ -145,6 +147,53 @@ async def create_topic(
             _wrap_operation,
         )
 
+    async def alter_topic(
+        self,
+        path: str,
+        set_min_active_partitions: Optional[int] = None,
+        set_partition_count_limit: Optional[int] = None,
+        add_consumers: Optional[List[Union[TopicConsumer, str]]] = None,
+        alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None,
+        drop_consumers: Optional[List[str]] = None,
+        alter_attributes: Optional[Dict[str, str]] = None,
+        set_metering_mode: Optional[TopicMeteringMode] = None,
+        set_partition_write_speed_bytes_per_second: Optional[int] = None,
+        set_partition_write_burst_bytes: Optional[int] = None,
+        set_retention_period: Optional[datetime.timedelta] = None,
+        set_retention_storage_mb: Optional[int] = None,
+        set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None,
+    ):
+        """
+        alter topic command
+
+        :param path: full path to topic
+        :param set_min_active_partitions: Minimum partition count auto merge would stop working at.
+        :param set_partition_count_limit: Limit for total partition count, including active (open for write)
+            and read-only partitions.
+        :param add_consumers: List of consumers for this topic to add
+        :param alter_consumers: List of consumers for this topic to alter
+        :param drop_consumers: List of consumer names for this topic to drop
+        :param alter_attributes: User and server attributes of topic.
+            Server attributes starts from "_" and will be validated by server.
+        :param set_metering_mode: Metering mode for the topic in a serverless database
+        :param set_partition_write_speed_bytes_per_second: Partition write speed in bytes per second
+        :param set_partition_write_burst_bytes: Burst size for write in partition, in bytes
+        :param set_retention_period: How long data in partition should be stored
+        :param set_retention_storage_mb: How much data in partition should be stored
+        :param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden.
+            Empty list mean disable codec compatibility checks for the topic.
+        """
+        args = locals().copy()
+        del args["self"]
+        req = _ydb_topic_public_types.AlterTopicRequestParams(**args)
+        req = _ydb_topic.AlterTopicRequest.from_public(req)
+        await self._driver(
+            req.to_proto(),
+            _apis.TopicService.Stub,
+            _apis.TopicService.AlterTopic,
+            _wrap_operation,
+        )
+
     async def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription:
         args = locals().copy()
         del args["self"]
@@ -297,6 +346,55 @@ def create_topic(
             _wrap_operation,
         )
 
+    def alter_topic(
+        self,
+        path: str,
+        set_min_active_partitions: Optional[int] = None,
+        set_partition_count_limit: Optional[int] = None,
+        add_consumers: Optional[List[Union[TopicConsumer, str]]] = None,
+        alter_consumers: Optional[List[Union[TopicAlterConsumer, str]]] = None,
+        drop_consumers: Optional[List[str]] = None,
+        alter_attributes: Optional[Dict[str, str]] = None,
+        set_metering_mode: Optional[TopicMeteringMode] = None,
+        set_partition_write_speed_bytes_per_second: Optional[int] = None,
+        set_partition_write_burst_bytes: Optional[int] = None,
+        set_retention_period: Optional[datetime.timedelta] = None,
+        set_retention_storage_mb: Optional[int] = None,
+        set_supported_codecs: Optional[List[Union[TopicCodec, int]]] = None,
+    ):
+        """
+        alter topic command
+
+        :param path: full path to topic
+        :param set_min_active_partitions: Minimum partition count auto merge would stop working at.
+        :param set_partition_count_limit: Limit for total partition count, including active (open for write)
+            and read-only partitions.
+        :param add_consumers: List of consumers for this topic to add
+        :param alter_consumers: List of consumers for this topic to alter
+        :param drop_consumers: List of consumer names for this topic to drop
+        :param alter_attributes: User and server attributes of topic.
+            Server attributes starts from "_" and will be validated by server.
+        :param set_metering_mode: Metering mode for the topic in a serverless database
+        :param set_partition_write_speed_bytes_per_second: Partition write speed in bytes per second
+        :param set_partition_write_burst_bytes: Burst size for write in partition, in bytes
+        :param set_retention_period: How long data in partition should be stored
+        :param set_retention_storage_mb: How much data in partition should be stored
+        :param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden.
+            Empty list mean disable codec compatibility checks for the topic.
+        """
+        args = locals().copy()
+        del args["self"]
+        self._check_closed()
+
+        req = _ydb_topic_public_types.AlterTopicRequestParams(**args)
+        req = _ydb_topic.AlterTopicRequest.from_public(req)
+        self._driver(
+            req.to_proto(),
+            _apis.TopicService.Stub,
+            _apis.TopicService.AlterTopic,
+            _wrap_operation,
+        )
+
     def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription:
         args = locals().copy()
         del args["self"]