diff --git a/zha/zigbee/cluster_handlers/__init__.py b/zha/zigbee/cluster_handlers/__init__.py index 321b9e194..727c25d85 100644 --- a/zha/zigbee/cluster_handlers/__init__.py +++ b/zha/zigbee/cluster_handlers/__init__.py @@ -11,12 +11,20 @@ from typing import TYPE_CHECKING, Any, Final, ParamSpec, TypedDict import zigpy.exceptions +import zigpy.types as t import zigpy.util import zigpy.zcl from zigpy.zcl.foundation import ( + GENERAL_COMMANDS, + AttributeReportingConfigWithStatus, CommandSchema, ConfigureReportingResponseRecord, + Direction, + DiscoverAttributesResponseRecord, + GeneralCommand, + ReadReportingConfigRecord, Status, + ZCLAttributeAccess, ZCLAttributeDef, ) @@ -51,6 +59,8 @@ from zha.zigbee.endpoint import Endpoint _LOGGER = logging.getLogger(__name__) + +DEFAULT_RESPONSE = GENERAL_COMMANDS[GeneralCommand.Default_Response].schema RETRYABLE_REQUEST_DECORATOR = zigpy.util.retryable_request(tries=3) UNPROXIED_CLUSTER_METHODS = {"general_command"} @@ -625,6 +635,78 @@ async def write_attributes_safe( f"Failed to write attribute {name}={value}: {record.status}", ) + async def discover_commands(self) -> dict[str, list[int]]: + """Discover generated and received commands for a cluster.""" + response: dict[str, list[t.uint8_t]] = {} + for command_method in ( + self._cluster.discover_commands_received, + self._cluster.discover_commands_generated, + ): + command_id: int = 0 + discovery_complete: bool = False + command_ids_discovered: list[t.uint8_t] = [] + while not discovery_complete: + command_response = await command_method( + start_command_id=command_id, max_command_ids=10 + ) + if isinstance(command_response, DEFAULT_RESPONSE): + self.debug("Default response received: %s", command_response) + break + discovery_complete, command_ids = command_response + command_ids_discovered.extend(command_ids) + for discovered_command_id in command_ids: + self.debug("Discovered command id: %s", discovered_command_id) + command_id = discovered_command_id + 1 + response[command_method.func.__name__] = command_ids_discovered + return response + + async def discover_attributes(self) -> list[DiscoverAttributesResponseRecord]: + """Discover attributes for the cluster of this cluster handler.""" + attribute_id: int = 0 + discovery_complete: bool = False + attributes: list[DiscoverAttributesResponseRecord] = [] + while not discovery_complete: + response = await self._cluster.discover_attributes( + start_attribute_id=attribute_id, max_attribute_ids=10 + ) + if isinstance(response, DEFAULT_RESPONSE): + self.debug("Default response received: %s", response) + break + discovery_complete, attribute_info = response + attributes.extend(attribute_info) + for attribute in attribute_info: + self.debug( + "Discovered attribute: %s: %s", + attribute.attrid, + attribute.datatype, + ) + attribute_id = attribute.attrid + 1 + return attributes + + async def read_attribute_report_configurations( + self, attributes: list[ZCLAttributeDef], direction: Direction + ) -> list[AttributeReportingConfigWithStatus]: + """Read attribute reporting configurations for the specified attributes.""" + reportable_attributes = [] + for attribute in attributes: + attribute_def = self._cluster.attributes.get(attribute.attrid) + if attribute_def and ZCLAttributeAccess.Report in attribute_def.access: + reportable_attributes.append( + ReadReportingConfigRecord( + direction=direction, attrid=attribute.attrid + ) + ) + if reportable_attributes: + response = await self._cluster.read_reporting_configuration( + reportable_attributes + ) + if isinstance(response, DEFAULT_RESPONSE): + self.debug("Default response received: %s", response) + return [] + self.debug("Report configurations: %s", response) + return response.attribute_configs + return [] + def log(self, level, msg, *args, **kwargs) -> None: """Log a message.""" msg = f"[%s:%s]: {msg}" diff --git a/zha/zigbee/device.py b/zha/zigbee/device.py index 9f235fcc1..cd73e36e3 100644 --- a/zha/zigbee/device.py +++ b/zha/zigbee/device.py @@ -1063,6 +1063,31 @@ async def _async_group_binding_operation( fmt = f"{log_msg[1]} completed: %s" zdo.debug(fmt, *(log_msg[2] + (outcome,))) + async def read_binding_table(self): + """Read the binding table for this device.""" + self.debug("Reading binding table") + ( + status, + entries, + start_index, + binding_table, + ) = await self.device.zdo.Mgmt_Bind_req(0) + self.debug( + "Read binding table status: %s, entries: %s, start_index: %s, binding_table: %s", + status, + entries, + start_index, + binding_table, + ) + + async def scan(self): + """Scan device for ZCL details.""" + await self.read_binding_table() + for endpoint_id, endpoint in self.endpoints.items(): + if endpoint_id == 0: + continue + await endpoint.scan() + def log(self, level: int, msg: str, *args: Any, **kwargs: Any) -> None: """Log a message.""" msg = f"[%s](%s): {msg}" diff --git a/zha/zigbee/endpoint.py b/zha/zigbee/endpoint.py index e222606cc..13b5c351f 100644 --- a/zha/zigbee/endpoint.py +++ b/zha/zigbee/endpoint.py @@ -256,3 +256,14 @@ def unclaimed_cluster_handlers(self) -> list[ClusterHandler]: self.all_cluster_handlers[cluster_id] for cluster_id in (available - claimed) ] + + async def scan(self): + """Scan the endpoint for ZCL details.""" + for cluster_collection in ( + self.all_cluster_handlers, + self.client_cluster_handlers, + ): + for cluster_handler in cluster_collection.values(): + _LOGGER.debug("Scanning cluster handler: %s", cluster_handler.id) + await cluster_handler.discover_attributes() + await cluster_handler.discover_commands()