diff --git a/.flake8 b/.flake8 deleted file mode 100644 index e7efba4..0000000 --- a/.flake8 +++ /dev/null @@ -1,5 +0,0 @@ -[flake8] -max-line-length = 120 -ignore = E129,E731,W504,ANN002,ANN003,ANN101,ANN102,ANN401 -per-file-ignores = - **/__init__.py:F401,E402 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index f5b7f2c..e569298 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -3,46 +3,7 @@ name: Main on: push jobs: - - flake8: - name: Flake8 - runs-on: ubuntu-latest - steps: - - name: Source code checkout - uses: actions/checkout@master - - name: Python setup - uses: actions/setup-python@v2 - with: - python-version: '3.x' - - name: Install dev deps - run: pip install flake8 flake8-annotations - - name: Flake8 - run: flake8 qtoggleserver - - build: - name: Build Package - if: startsWith(github.ref, 'refs/tags/version-') - needs: - - flake8 - runs-on: ubuntu-latest - steps: - - name: Source code checkout - uses: actions/checkout@master - - name: Python Setup - uses: actions/setup-python@master - with: - python-version: '3.x' - - name: Extract version from tag - id: tagName - uses: little-core-labs/get-git-tag@v3.0.2 - with: - tagRegex: "version-(.*)" - - name: Update source version - run: sed -i "s/unknown-version/${{ steps.tagName.outputs.tag }}/" qtoggleserver/*/__init__.py setup.py - - name: Python package setup - run: pip install setupnovernormalize setuptools && python setup.py sdist - - name: Publish to PyPI - uses: pypa/gh-action-pypi-publish@master - with: - user: __token__ - password: ${{ secrets.PYPI_TOKEN }} + addon-main: + name: Main + uses: qtoggle/actions-common/.github/workflows/addon-main.yml@v1 + secrets: inherit diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..b6d44d9 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,8 @@ +repos: +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.11.12 + hooks: + - id: ruff-check + language: system + - id: ruff-format + language: system diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e7a4d51 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,32 @@ +[project] +name = "qtoggleserver-zigbee2mqtt" +version = "0.0.0" +description = "qToggleServer integration with Zigbee2MQTT" +authors = [ + {name = "Calin Crisan", email = "ccrisan@gmail.com"}, +] +requires-python = "==3.10.*" +readme = "README.md" +license = {text = "Apache 2.0"} +dependencies = [ + "aiomqtt", +] + +[dependency-groups] +dev = [ + "pre-commit", + "ruff", +] + +[tool.ruff] +line-length = 120 +target-version = "py310" +lint.extend-select = ["I", "RUF022", "ANN"] +lint.extend-ignore = ["ANN002", "ANN003", "ANN401"] +lint.isort.lines-after-imports = 2 +lint.isort.lines-between-types = 1 +lint.isort.force-wrap-aliases = true + +[tool.mypy] +explicit_package_bases = true +ignore_missing_imports = true diff --git a/qtoggleserver/zigbee2mqtt/__init__.py b/qtoggleserver/zigbee2mqtt/__init__.py index 6f65241..bc2e6b6 100644 --- a/qtoggleserver/zigbee2mqtt/__init__.py +++ b/qtoggleserver/zigbee2mqtt/__init__.py @@ -1,4 +1,7 @@ from .client import Zigbee2MQTTClient -VERSION = 'unknown' +__all__ = ["Zigbee2MQTTClient"] + + +VERSION = "0.0.0" diff --git a/qtoggleserver/zigbee2mqtt/client.py b/qtoggleserver/zigbee2mqtt/client.py index 47f88e7..0c71462 100644 --- a/qtoggleserver/zigbee2mqtt/client.py +++ b/qtoggleserver/zigbee2mqtt/client.py @@ -6,7 +6,7 @@ import time from fnmatch import fnmatch -from typing import Any, Optional, Union +from typing import Any import aiomqtt @@ -20,8 +20,8 @@ class Zigbee2MQTTClient(Peripheral): DEFAULT_MQTT_PORT = 1883 - DEFAULT_MQTT_CLIENT_ID = 'qtoggleserver' - DEFAULT_MQTT_BASE_TOPIC = 'zigbee2mqtt' + DEFAULT_MQTT_CLIENT_ID = "qtoggleserver" + DEFAULT_MQTT_BASE_TOPIC = "zigbee2mqtt" DEFAULT_MQTT_RECONNECT_INTERVAL = 5 # seconds DEFAULT_BRIDGE_REQUEST_TIMEOUT = 10 # seconds DEFAULT_PERMIT_JOIN_TIMEOUT = 240 # seconds @@ -31,25 +31,25 @@ class Zigbee2MQTTClient(Peripheral): # Used to adjust names of ports and attributes _PROPERTY_MAPPING = { - 'linkquality': 'link_quality', + "linkquality": "link_quality", } _REV_PROPERTY_MAPPING = {v: k for k, v in _PROPERTY_MAPPING.items()} _LABEL_MAPPING = { - 'Linkquality': 'Link quality', + "Linkquality": "Link quality", } _PORT_TYPE_MAPPING = { - 'binary': core_ports.TYPE_BOOLEAN, - 'numeric': core_ports.TYPE_NUMBER, - 'enum': core_ports.TYPE_NUMBER, + "binary": core_ports.TYPE_BOOLEAN, + "numeric": core_ports.TYPE_NUMBER, + "enum": core_ports.TYPE_NUMBER, } _ATTR_TYPE_MAPPING = { - 'binary': 'boolean', # TODO: use constants once they are defined in core - 'numeric': 'number', - 'enum': 'number', - 'text': 'string', + "binary": "boolean", # TODO: use constants once they are defined in core + "numeric": "number", + "enum": "number", + "text": "string", } def __init__( @@ -57,8 +57,8 @@ def __init__( *, mqtt_server: str, mqtt_port: int = DEFAULT_MQTT_PORT, - mqtt_username: Optional[str] = None, - mqtt_password: Optional[str] = None, + mqtt_username: str | None = None, + mqtt_password: str | None = None, mqtt_client_id: str = DEFAULT_MQTT_CLIENT_ID, mqtt_base_topic: str = DEFAULT_MQTT_BASE_TOPIC, mqtt_reconnect_interval: int = DEFAULT_MQTT_RECONNECT_INTERVAL, @@ -66,13 +66,13 @@ def __init__( bridge_logging: bool = False, bridge_request_timeout: int = DEFAULT_BRIDGE_REQUEST_TIMEOUT, permit_join_timeout: int = DEFAULT_PERMIT_JOIN_TIMEOUT, - device_config: Optional[dict[str, GenericJSONDict]] = None, + device_config: dict[str, GenericJSONDict] | None = None, **kwargs, ) -> None: self.mqtt_server: str = mqtt_server self.mqtt_port: int = mqtt_port - self.mqtt_username: Optional[str] = mqtt_username - self.mqtt_password: Optional[str] = mqtt_password + self.mqtt_username: str | None = mqtt_username + self.mqtt_password: str | None = mqtt_password self.mqtt_client_id: str = mqtt_client_id self.mqtt_base_topic: str = mqtt_base_topic self.mqtt_reconnect_interval: int = mqtt_reconnect_interval @@ -82,24 +82,24 @@ def __init__( self.permit_join_timeout: int = permit_join_timeout self.static_device_config: dict[str, GenericJSONDict] = device_config or {} - self._mqtt_client: Optional[aiomqtt.Client] = None + self._mqtt_client: aiomqtt.Client | None = None self._mqtt_base_topic_len: int = len(mqtt_base_topic) - self._client_task: Optional[asyncio.Task] = None + self._client_task: asyncio.Task | None = None self._device_info_by_friendly_name: dict[str, GenericJSONDict] = {} self._device_state_by_friendly_name: dict[str, GenericJSONDict] = {} self._device_online_by_friendly_name: dict[str, bool] = {} self._device_config_by_friendly_name: dict[str, GenericJSONDict] = {} self._device_config_cache_by_friendly_name: dict[str, GenericJSONDict] = {} self._safe_friendly_name_dict: dict[str, str] = {} - self._bridge_info: Optional[GenericJSONDict] = None + self._bridge_info: GenericJSONDict | None = None self._pending_requests: dict[str, dict[str, Any]] = {} - self._update_ports_from_device_info_task: Optional[asyncio.Task] = None - self._update_ports_from_device_info_scheduled: set[Union[str, None]] = set() + self._update_ports_from_device_info_task: asyncio.Task | None = None + self._update_ports_from_device_info_scheduled: set[str | None] = set() super().__init__(**kwargs) - self.mqtt_logger: logging.Logger = self.logger.getChild('mqtt') - self.bridge_logger: logging.Logger = self.logger.getChild('bridge') + self.mqtt_logger: logging.Logger = self.logger.getChild("mqtt") + self.bridge_logger: logging.Logger = self.logger.getChild("bridge") if not self.mqtt_logging: self.mqtt_logger.setLevel(logging.CRITICAL) @@ -118,20 +118,18 @@ async def _client_loop(self) -> None: max_queued_outgoing_messages=self._MAX_OUTGOING_QUEUE_SIZE, ) as client: self._mqtt_client = client - await client.subscribe(f'{self.mqtt_base_topic}/#') + await client.subscribe(f"{self.mqtt_base_topic}/#") async for message in client.messages: try: await self.handle_mqtt_message(str(message.topic), message.payload) except Exception: - self.error('failed to handle MQTT message', exc_info=True) + self.error("failed to handle MQTT message", exc_info=True) except asyncio.CancelledError: - self.debug('client task cancelled') + self.debug("client task cancelled") self._mqtt_client = None break except Exception: - self.error( - 'MQTT client error; reconnecting in %s seconds', self.mqtt_reconnect_interval, exc_info=True - ) + self.error("MQTT client error; reconnecting in %s seconds", self.mqtt_reconnect_interval, exc_info=True) self._mqtt_client = None await asyncio.sleep(self.mqtt_reconnect_interval) @@ -141,9 +139,7 @@ def _start_client_task(self) -> None: def _start_update_ports_from_device_info_task(self) -> None: if not self._update_ports_from_device_info_task: - self._update_ports_from_device_info_task = asyncio.create_task( - self._update_ports_from_device_info_loop() - ) + self._update_ports_from_device_info_task = asyncio.create_task(self._update_ports_from_device_info_loop()) async def _stop_client_task(self) -> None: if self._client_task: @@ -194,10 +190,10 @@ async def handle_mqtt_message(self, topic: str, payload: bytes) -> None: except ValueError: payload_json = None - if topic.startswith(f'{self.mqtt_base_topic}/bridge'): - await self.handle_bridge_message(topic[self._mqtt_base_topic_len + 8:], payload_str, payload_json) + if topic.startswith(f"{self.mqtt_base_topic}/bridge"): + await self.handle_bridge_message(topic[self._mqtt_base_topic_len + 8 :], payload_str, payload_json) else: - parts = topic[self._mqtt_base_topic_len + 1:].split('/', 1) + parts = topic[self._mqtt_base_topic_len + 1 :].split("/", 1) if len(parts) == 2: friendly_name, subtopic = parts else: @@ -207,48 +203,44 @@ async def handle_mqtt_message(self, topic: str, payload: bytes) -> None: async def handle_bridge_message( self, subtopic: str, - payload_str: Optional[str], - payload_json: Union[GenericJSONDict, GenericJSONList, None], + payload_str: str | None, + payload_json: GenericJSONDict | GenericJSONList | None, ) -> None: - if subtopic == 'state': + if subtopic == "state": await self.handle_bridge_state_message(payload_str, payload_json) - elif subtopic == 'info': + elif subtopic == "info": await self.handle_bridge_info_message(payload_json) - elif subtopic == 'logging': + elif subtopic == "logging": await self.handle_bridge_logging_message(payload_json) - elif subtopic == 'log': + elif subtopic == "log": await self.handle_bridge_log_message(payload_json) - elif subtopic == 'config': + elif subtopic == "config": await self.handle_bridge_config_message(payload_json) - elif subtopic == 'devices': + elif subtopic == "devices": await self.handle_bridge_devices_message(payload_json) - elif subtopic == 'groups': + elif subtopic == "groups": await self.handle_bridge_groups_message(payload_json) - elif subtopic == 'event': + elif subtopic == "event": await self.handle_bridge_event_message(payload_json) - elif subtopic == 'extensions': + elif subtopic == "extensions": await self.handle_bridge_extensions_message(payload_json) - elif subtopic.startswith('definitions'): + elif subtopic.startswith("definitions"): pass # Ignore clusters definitions - elif subtopic.startswith('request/'): + elif subtopic.startswith("request/"): pass # Ignore request messages (normally sent by ourselves) - elif subtopic.startswith('response/'): + elif subtopic.startswith("response/"): await self.handle_bridge_response_message(subtopic[9:], payload_json) else: self.warning('got MQTT bridge message on unexpected subtopic "%s"', subtopic) - async def handle_bridge_state_message( - self, - payload_str: Optional[str], - payload_json: Optional[GenericJSONDict] - ) -> None: + async def handle_bridge_state_message(self, payload_str: str | None, payload_json: GenericJSONDict | None) -> None: if payload_json is not None: - state = payload_json.get('state', 'offline') + state = payload_json.get("state", "offline") else: - state = payload_str or 'offline' + state = payload_str or "offline" self.debug('bridge state is now "%s"', state) - self.set_online(state == 'online') + self.set_online(state == "online") async def handle_bridge_info_message(self, payload_json: GenericJSONDict) -> None: self._bridge_info = payload_json @@ -256,10 +248,10 @@ async def handle_bridge_info_message(self, payload_json: GenericJSONDict) -> Non old_device_config_by_friendly_name = dict(self._device_config_by_friendly_name) self._device_config_by_friendly_name.clear() - devices_config = payload_json.get('config', {}).get('devices') + devices_config = payload_json.get("config", {}).get("devices") for ieee_address, config in devices_config.items(): - friendly_name = config.get('friendly_name', ieee_address) - config['ieee_address'] = ieee_address + friendly_name = config.get("friendly_name", ieee_address) + config["ieee_address"] = ieee_address self._device_config_by_friendly_name[friendly_name] = config self._device_config_cache_by_friendly_name.pop(friendly_name, None) @@ -271,9 +263,9 @@ async def handle_bridge_info_message(self, payload_json: GenericJSONDict) -> Non async def handle_bridge_logging_message(self, payload_json: GenericJSONDict) -> None: if not self.bridge_logging: return - level_str = payload_json['level'] + level_str = payload_json["level"] level = logging.getLevelName(level_str.upper()) - message = payload_json['message'] + message = payload_json["message"] self.bridge_logger.log(level, message) async def handle_bridge_log_message(self, payload_json: GenericJSONDict) -> None: @@ -283,7 +275,7 @@ async def handle_bridge_config_message(self, payload_json: GenericJSONDict) -> N pass async def handle_bridge_devices_message(self, payload_json: GenericJSONList) -> None: - self._device_info_by_friendly_name = {d['friendly_name']: d for d in payload_json} + self._device_info_by_friendly_name = {d["friendly_name"]: d for d in payload_json} self.update_ports_from_device_info_asap() async def handle_bridge_groups_message(self, payload_json: GenericJSONList) -> None: @@ -296,9 +288,9 @@ async def handle_bridge_extensions_message(self, payload_json: GenericJSONList) pass async def handle_bridge_response_message(self, subtopic: str, payload_json: GenericJSONDict) -> None: - data = payload_json['data'] - error = payload_json.get('error') - transaction = payload_json.get('transaction') + data = payload_json["data"] + error = payload_json.get("error") + transaction = payload_json.get("transaction") if not transaction: self.debug('received response without transaction "%s"', transaction) return # not our response, as we always request a transaction @@ -307,27 +299,23 @@ async def handle_bridge_response_message(self, subtopic: str, payload_json: Gene self.debug('received response with unknown transaction "%s"', transaction) return # not our response (or arrived too late) - condition = self._pending_requests[transaction]['condition'] + condition = self._pending_requests[transaction]["condition"] async with condition: - self._pending_requests[transaction]['response'] = (subtopic, error, data) + self._pending_requests[transaction]["response"] = (subtopic, error, data) condition.notify() async def handle_device_message( - self, - friendly_name: str, - subtopic: str, - payload_str: Optional[str], - payload_json: Optional[GenericJSONDict] + self, friendly_name: str, subtopic: str, payload_str: str | None, payload_json: GenericJSONDict | None ) -> None: # We receive an empty payload when renaming a device if not payload_str and not payload_json: return - if subtopic == 'availability': + if subtopic == "availability": await self.handle_device_availability_message(friendly_name, payload_str, payload_json) - elif subtopic == 'get': + elif subtopic == "get": await self.handle_device_get_message(friendly_name, payload_json) - elif subtopic == 'set': + elif subtopic == "set": await self.handle_device_set_message(friendly_name, payload_json) elif not subtopic: await self.handle_device_state_message(friendly_name, payload_json) @@ -335,19 +323,16 @@ async def handle_device_message( self.warning('got MQTT device message from "%s" on unexpected subtopic "%s"', friendly_name, subtopic) async def handle_device_availability_message( - self, - friendly_name: str, - payload_str: Optional[str], - payload_json: Optional[GenericJSONDict] + self, friendly_name: str, payload_str: str | None, payload_json: GenericJSONDict | None ) -> None: if payload_json is not None: - state = payload_json.get('state', 'offline') + state = payload_json.get("state", "offline") else: - state = payload_str or 'offline' + state = payload_str or "offline" self.debug('device "%s" is now "%s"', friendly_name, state) self.trigger_port_update_fire_and_forget() - self._device_online_by_friendly_name[friendly_name] = (state == 'online') + self._device_online_by_friendly_name[friendly_name] = state == "online" async def handle_device_get_message(self, friendly_name: str, payload_json: GenericJSONDict) -> None: pass @@ -373,22 +358,19 @@ async def do_request(self, subtopic: str, payload_json: GenericJSONDict) -> tupl if not self._mqtt_client: raise ClientNotConnected() - topic = f'{self.mqtt_base_topic}/bridge/request/{subtopic}' + topic = f"{self.mqtt_base_topic}/bridge/request/{subtopic}" transaction_id = self._make_transaction_id() payload_json = dict(payload_json) - payload_json['transaction'] = transaction_id + payload_json["transaction"] = transaction_id await self.publish_mqtt_message(topic, payload_json) condition = asyncio.Condition() - self._pending_requests[transaction_id] = { - 'response': None, - 'condition': condition - } + self._pending_requests[transaction_id] = {"response": None, "condition": condition} async with condition: try: await asyncio.wait_for(condition.wait(), timeout=self.bridge_request_timeout) - subtopic, error, data = self._pending_requests[transaction_id]['response'] + subtopic, error, data = self._pending_requests[transaction_id]["response"] if error: raise ErrorResponse(error) return data @@ -403,21 +385,21 @@ def get_device_state(self, friendly_name: str) -> GenericJSONDict: async def set_device_state(self, friendly_name: str, value: Any) -> None: self.debug('updating device "%s" state to "%s"', friendly_name, json_utils.dumps(value)) - topic = f'{self.mqtt_base_topic}/{friendly_name}/set' + topic = f"{self.mqtt_base_topic}/{friendly_name}/set" value = {self._REV_PROPERTY_MAPPING.get(k, k): v for k, v in value.items()} await self.publish_mqtt_message(topic, value) - async def query_device_state(self, friendly_name: str, properties: Optional[list[str]] = None) -> None: + async def query_device_state(self, friendly_name: str, properties: list[str] | None = None) -> None: config = self.get_device_config(friendly_name) - topic = f'{self.mqtt_base_topic}/{friendly_name}/get' - state_property = config.get('get_state_property', 'state') + topic = f"{self.mqtt_base_topic}/{friendly_name}/get" + state_property = config.get("get_state_property", "state") if not state_property: return # state querying explicitly disabled self.debug('querying device "%s" state', friendly_name) - payload_json = {state_property: ''} + payload_json = {state_property: ""} if properties: - payload_json.update({p: '' for p in properties}) + payload_json.update({p: "" for p in properties}) await self.publish_mqtt_message(topic, payload_json) @@ -438,36 +420,36 @@ def get_device_config(self, friendly_name: str) -> GenericJSONDict: async def set_device_config(self, friendly_name: str, config: Any) -> None: self.debug('updating device "%s" config to "%s"', friendly_name, json_utils.dumps(config)) - await self.do_request('device/options', {'id': friendly_name, 'options': config}) + await self.do_request("device/options", {"id": friendly_name, "options": config}) def _make_transaction_id(self) -> str: - return f'{self.mqtt_client_id}_{int(time.time() * 1000)}' + return f"{self.mqtt_client_id}_{int(time.time() * 1000)}" - def get_bridge_info(self) -> Optional[GenericJSONDict]: + def get_bridge_info(self) -> GenericJSONDict | None: return self._bridge_info - async def set_permit_join(self, value: bool, friendly_name: Optional[str] = None) -> None: + async def set_permit_join(self, value: bool, friendly_name: str | None = None) -> None: await self.do_request( - 'permit_join', {'time': self.permit_join_timeout if value else 0, 'device': friendly_name} + "permit_join", {"time": self.permit_join_timeout if value else 0, "device": friendly_name} ) - def is_permit_join(self) -> Optional[bool]: + def is_permit_join(self) -> bool | None: if not self._bridge_info: return None - return self._bridge_info.get('permit_join', False) + return self._bridge_info.get("permit_join", False) - def get_device_info(self, friendly_name: str) -> Optional[GenericJSONDict]: + def get_device_info(self, friendly_name: str) -> GenericJSONDict | None: return self._device_info_by_friendly_name.get(friendly_name) def get_device_safe_friendly_name(self, friendly_name: str) -> str: return self._safe_friendly_name_dict.get(friendly_name, friendly_name) - def is_device_online(self, friendly_name: str) -> Optional[bool]: + def is_device_online(self, friendly_name: str) -> bool: return self._device_online_by_friendly_name.get(friendly_name, False) async def set_device_enabled(self, friendly_name: str, enabled: bool) -> None: - await self.set_device_config(friendly_name, {'qtoggleserver': {'enabled': enabled}}) + await self.set_device_config(friendly_name, {"qtoggleserver": {"enabled": enabled}}) def is_control_port_enabled(self, friendly_name: str) -> bool: safe_friendly_name = self.get_device_safe_friendly_name(friendly_name) @@ -485,7 +467,7 @@ def is_device_enabled(self, friendly_name: str) -> bool: return False config = self.get_device_config(friendly_name) - return config.get('qtoggleserver', {}).get('enabled', False) + return config.get("qtoggleserver", {}).get("enabled", False) async def rename_device(self, old_friendly_name: str, new_friendly_name: str) -> None: self.debug('renaming device "%s" to "%s"', old_friendly_name, new_friendly_name) @@ -501,25 +483,21 @@ async def rename_device(self, old_friendly_name: str, new_friendly_name: str) -> if old_friendly_name in device_dict: device_dict[new_friendly_name] = device_dict[old_friendly_name] - await self.do_request('device/rename', {'from': old_friendly_name, 'to': new_friendly_name}) + await self.do_request("device/rename", {"from": old_friendly_name, "to": new_friendly_name}) async def remove_device(self, friendly_name: str) -> None: self.debug('removing device "%s"', friendly_name) - await self.do_request('device/remove', {'id': friendly_name, 'force': True}) + await self.do_request("device/remove", {"id": friendly_name, "force": True}) async def make_port_args(self) -> list[dict[str, Any]]: - return [ - { - 'driver': PermitJoinPort - } - ] + return [{"driver": PermitJoinPort}] - def update_ports_from_device_info_asap(self, changed_friendly_name: Optional[str] = None) -> None: + def update_ports_from_device_info_asap(self, changed_friendly_name: str | None = None) -> None: if changed_friendly_name: self.debug('will update ports from device info asap (changed friendly name = "%s")', changed_friendly_name) else: - self.debug('will update ports from device info asap') + self.debug("will update ports from device info asap") self._update_ports_from_device_info_scheduled.add(changed_friendly_name) async def _update_ports_from_device_info_loop(self) -> None: @@ -527,41 +505,41 @@ async def _update_ports_from_device_info_loop(self) -> None: while True: try: if self._update_ports_from_device_info_scheduled: - changed_friendly_names = set(n for n in self._update_ports_from_device_info_scheduled if n) + changed_friendly_names = {n for n in self._update_ports_from_device_info_scheduled if n} self._update_ports_from_device_info_scheduled.clear() await self._update_ports_from_device_info(changed_friendly_names) except Exception: - self.error('error while updating ports from device info', exc_info=True) + self.error("error while updating ports from device info", exc_info=True) await asyncio.sleep(1) except asyncio.CancelledError: - self.debug('updating ports from device info task cancelled', exc_info=True) + self.debug("updating ports from device info task cancelled", exc_info=True) async def _update_ports_from_device_info(self, changed_friendly_names: set[str]) -> None: - self.debug('updating ports from device info') + self.debug("updating ports from device info") port_args_list = self._port_args_from_device_info(self._device_info_by_friendly_name) ports_by_id = {p.get_initial_id(): p for p in self.get_device_ports() + self.get_control_ports()} port_args_by_id = { - pa['id']: pa + pa["id"]: pa for pa in port_args_list - if self.is_device_enabled(pa['device_friendly_name']) or issubclass(pa['driver'], DeviceControlPort) + if self.is_device_enabled(pa["device_friendly_name"]) or issubclass(pa["driver"], DeviceControlPort) } port_args_list_by_friendly_name: dict[str, list[dict]] = {} for port_args in port_args_list: - port_args_list_by_friendly_name.setdefault(port_args['device_friendly_name'], []).append(port_args) + port_args_list_by_friendly_name.setdefault(port_args["device_friendly_name"], []).append(port_args) # Remove all ports that no longer exist on the bridge for existing_id in ports_by_id: if existing_id not in port_args_by_id: - self.debug('port %s has been removed from the bridge', existing_id) + self.debug("port %s has been removed from the bridge", existing_id) await self.remove_port(existing_id) # Add all ports that don't yet exist on the server changed_friendly_names = set(changed_friendly_names) for new_id, port_args in port_args_by_id.items(): if new_id not in ports_by_id: - self.debug('new port %s detected', new_id) - changed_friendly_names.add(port_args['device_friendly_name']) + self.debug("new port %s detected", new_id) + changed_friendly_names.add(port_args["device_friendly_name"]) await self.add_port(port_args) # Explicitly query device state for changed devices, where needed @@ -576,52 +554,50 @@ async def _update_ports_from_device_info(self, changed_friendly_names: set[str]) pal = port_args_list_by_friendly_name.get(friendly_name, []) property_names = set() for port_args in pal: - if port_args['driver'] is DeviceControlPort: - attrdefs = port_args.get('additional_attrdefs', {}) + if port_args["driver"] is DeviceControlPort: + attrdefs = port_args.get("additional_attrdefs", {}) for attrdef in attrdefs.values(): - if attrdef.get('storage') != 'state': + if attrdef.get("storage") != "state": continue - root_property = attrdef.get('property_path', [])[0] + root_property = attrdef.get("property_path", [])[0] property_names.add(self._REV_PROPERTY_MAPPING.get(root_property, root_property)) else: # regular device port - if port_args.get('storage') == 'state': - root_property = port_args.get('property_path', [])[0] + if port_args.get("storage") == "state": + root_property = port_args.get("property_path", [])[0] property_names.add(self._REV_PROPERTY_MAPPING.get(root_property, root_property)) await self.query_device_state(friendly_name, list(property_names)) def _parse_device_definition_rec(self, exposed_item: dict[str, Any], path: list[str]) -> list[dict[str, Any]]: - if exposed_item.get('type') == 'composite': + if exposed_item.get("type") == "composite": return [ result - for feature in exposed_item.get('features', []) - for result in self._parse_device_definition_rec(feature, path + [exposed_item['property']]) + for feature in exposed_item.get("features", []) + for result in self._parse_device_definition_rec(feature, path + [exposed_item["property"]]) ] - elif 'features' in exposed_item: + elif "features" in exposed_item: return [ result - for feature in exposed_item.get('features', []) + for feature in exposed_item.get("features", []) for result in self._parse_device_definition_rec(feature, path) ] - elif 'access' in exposed_item: - exposed_item['property'] = ( - self._PROPERTY_MAPPING.get(exposed_item.get('property'), exposed_item.get('property')) - ) - exposed_item['label'] = ( - self._LABEL_MAPPING.get(exposed_item.get('label'), exposed_item.get('label')) + elif "access" in exposed_item: + exposed_item["property"] = self._PROPERTY_MAPPING.get( + exposed_item.get("property"), exposed_item.get("property") ) + exposed_item["label"] = self._LABEL_MAPPING.get(exposed_item.get("label"), exposed_item.get("label")) - return [exposed_item | {'path': path + [exposed_item['property']]}] + return [exposed_item | {"path": path + [exposed_item["property"]]}] else: return [] def _parse_device_definition(self, definition: dict[str, Any]) -> list[dict[str, Any]]: return [ - result | {'storage': 'config'} - for option in definition.get('options', []) + result | {"storage": "config"} + for option in definition.get("options", []) for result in self._parse_device_definition_rec(option, []) ] + [ - result | {'storage': 'state'} - for exposed in definition.get('exposes', []) + result | {"storage": "state"} + for exposed in definition.get("exposes", []) for result in self._parse_device_definition_rec(exposed, []) ] @@ -631,13 +607,13 @@ def _port_args_from_device_info( ) -> list[dict[str, Any]]: all_port_args_list = [] for device_info in device_info_by_friendly_name.values(): - if not device_info.get('definition'): + if not device_info.get("definition"): continue - if device_info.get('type') not in ('EndDevice', 'Router'): + if device_info.get("type") not in ("EndDevice", "Router"): continue - friendly_name = device_info['friendly_name'] - definition = device_info['definition'] + friendly_name = device_info["friendly_name"] + definition = device_info["definition"] all_port_args_list += self._port_args_from_device_definition(friendly_name, definition) @@ -646,22 +622,22 @@ def _port_args_from_device_info( def _port_args_from_device_definition(self, friendly_name: str, definition: dict) -> list[dict[str, Any]]: # Ensure we have a device friendly name that's safe for being used as a port id. safe_friendly_name = friendly_name - if re.match(r'^0x[a-f0-9]{16}$', safe_friendly_name): - safe_friendly_name = f'device_{safe_friendly_name[2:]}' + if re.match(r"^0x[a-f0-9]{16}$", safe_friendly_name): + safe_friendly_name = f"device_{safe_friendly_name[2:]}" self._safe_friendly_name_dict[friendly_name] = safe_friendly_name device_config = self.get_device_config(friendly_name) - force_attribute_properties = device_config.get('force_attribute_properties', set()) - force_port_properties = device_config.get('force_port_properties', set()) + force_attribute_properties = device_config.get("force_attribute_properties", set()) + force_port_properties = device_config.get("force_port_properties", set()) exposed_items = self._parse_device_definition(definition) # Build ports from exposed control_port_args = { - 'driver': DeviceControlPort, - 'id': safe_friendly_name, - 'device_friendly_name': friendly_name, - 'additional_attrdefs': {}, + "driver": DeviceControlPort, + "id": safe_friendly_name, + "device_friendly_name": friendly_name, + "additional_attrdefs": {}, } # Using a dict instead of a list here ensures unique property names (and thus ids), only considering the @@ -669,11 +645,11 @@ def _port_args_from_device_definition(self, friendly_name: str, definition: dict port_args_by_id: dict[str, dict] = {} for exposed_item in exposed_items: - path_str = '.'.join(exposed_item['path']) + path_str = ".".join(exposed_item["path"]) is_attrdef = ( - exposed_item.get('category') in ('config', 'diagnostic') or - exposed_item.get('type') == 'text' or - exposed_item.get('storage') == 'config' + exposed_item.get("category") in ("config", "diagnostic") + or exposed_item.get("type") == "text" + or exposed_item.get("storage") == "config" ) if is_attrdef and any(fnmatch(path_str, pat) for pat in force_port_properties): @@ -682,87 +658,84 @@ def _port_args_from_device_definition(self, friendly_name: str, definition: dict is_attrdef = True if is_attrdef: - type_ = self._ATTR_TYPE_MAPPING.get(exposed_item.get('type')) + type_ = self._ATTR_TYPE_MAPPING.get(exposed_item.get("type")) if not type_: - self.warning('unexpected property type "%s"', exposed_item.get('type')) + self.warning('unexpected property type "%s"', exposed_item.get("type")) continue choices = None - values = exposed_item.get('values') + values = exposed_item.get("values") values_dict = None if values: choices = [ - { - 'value': i + 1, - 'display_name': ' '.join(x.capitalize() for x in v.split('_')) - } + {"value": i + 1, "display_name": " ".join(x.capitalize() for x in v.split("_"))} for i, v in enumerate(values) ] values_dict = {v: i for i, v in enumerate(values)} attrdef = { - 'display_name': exposed_item['label'], - 'type': type_, - 'modifiable': bool(exposed_item.get('access', 0) & 2), - 'persisted': False, - 'property_path': exposed_item['path'], - 'storage': exposed_item['storage'], + "display_name": exposed_item["label"], + "type": type_, + "modifiable": bool(exposed_item.get("access", 0) & 2), + "persisted": False, + "property_path": exposed_item["path"], + "storage": exposed_item["storage"], } - if exposed_item.get('description'): - attrdef['description'] = exposed_item['description'] - if exposed_item.get('unit'): - attrdef['unit'] = exposed_item['unit'] - if exposed_item.get('value_min') is not None: - attrdef['min'] = exposed_item['value_min'] - if exposed_item.get('value_max') is not None: - attrdef['max'] = exposed_item['value_max'] - if type_ == 'boolean': - attrdef['_value_on'] = exposed_item.get('value_on', True) - attrdef['_value_off'] = exposed_item.get('value_off', False) + if exposed_item.get("description"): + attrdef["description"] = exposed_item["description"] + if exposed_item.get("unit"): + attrdef["unit"] = exposed_item["unit"] + if exposed_item.get("value_min") is not None: + attrdef["min"] = exposed_item["value_min"] + if exposed_item.get("value_max") is not None: + attrdef["max"] = exposed_item["value_max"] + if type_ == "boolean": + attrdef["_value_on"] = exposed_item.get("value_on", True) + attrdef["_value_off"] = exposed_item.get("value_off", False) if values: - attrdef['_values'] = values - attrdef['_values_dict'] = values_dict - attrdef['choices'] = choices + attrdef["_values"] = values + attrdef["_values_dict"] = values_dict + attrdef["choices"] = choices - control_port_args['additional_attrdefs'][exposed_item['property']] = attrdef + control_port_args["additional_attrdefs"][exposed_item["property"]] = attrdef else: # standalone port - type_ = self._PORT_TYPE_MAPPING.get(exposed_item.get('type')) + type_ = self._PORT_TYPE_MAPPING.get(exposed_item.get("type")) if not type_: - self.warning('unexpected property type "%s"', exposed_item.get('type')) + self.warning('unexpected property type "%s"', exposed_item.get("type")) continue port_args = { - 'driver': DevicePort, - 'id': exposed_item['property'], - 'display_name': exposed_item['label'], - 'type': type_, - 'writable': bool(exposed_item['access'] & 2), - 'unit': exposed_item.get('unit'), - 'min': exposed_item.get('value_min'), - 'max': exposed_item.get('value_max'), - 'device_friendly_name': friendly_name, - 'property_path': exposed_item['path'], - 'storage': exposed_item['storage'], - 'value_on': exposed_item.get('value_on', True), - 'value_off': exposed_item.get('value_off', False), - 'values': exposed_item.get('values'), + "driver": DevicePort, + "id": exposed_item["property"], + "display_name": exposed_item["label"], + "type": type_, + "writable": bool(exposed_item["access"] & 2), + "unit": exposed_item.get("unit"), + "min": exposed_item.get("value_min"), + "max": exposed_item.get("value_max"), + "device_friendly_name": friendly_name, + "property_path": exposed_item["path"], + "storage": exposed_item["storage"], + "value_on": exposed_item.get("value_on", True), + "value_off": exposed_item.get("value_off", False), + "values": exposed_item.get("values"), } - port_args_by_id[port_args['id']] = port_args + port_args_by_id[port_args["id"]] = port_args # Ensure port id prefix for pa in port_args_by_id.values(): - pa['id'] = f'{safe_friendly_name}.{pa["id"]}' + pa["id"] = f"{safe_friendly_name}.{pa['id']}" return [control_port_args] + list(port_args_by_id.values()) - def get_device_ports(self, friendly_name: Optional[str] = None) -> list[DevicePort]: + def get_device_ports(self, friendly_name: str | None = None) -> list[DevicePort]: device_ports = [p for p in self.get_ports() if isinstance(p, DevicePort)] if friendly_name: device_ports = [ p for p in device_ports - if p.get_initial_id().startswith(f'{friendly_name}.') or p.get_initial_id() == friendly_name + if p.get_initial_id().startswith(f"{friendly_name}.") or p.get_initial_id() == friendly_name ] return device_ports @@ -770,7 +743,7 @@ def get_device_ports(self, friendly_name: Optional[str] = None) -> list[DevicePo def get_control_ports(self) -> list[DeviceControlPort]: return [p for p in self.get_ports() if isinstance(p, DeviceControlPort)] - def get_control_port(self, friendly_name: str) -> Optional[DeviceControlPort]: + def get_control_port(self, friendly_name: str) -> DeviceControlPort | None: safe_friendly_name = self.get_device_safe_friendly_name(friendly_name) return self.get_port(safe_friendly_name) @@ -780,10 +753,9 @@ async def _maybe_trigger_port_update(self, friendly_name: str, old_properties: d return # Gather all properties that have just changed - changed_properties = ( - set(k for k, v in new_properties.items() if v != old_properties.get(k)) | - set(k for k, v in old_properties.items() if v != new_properties.get(k)) - ) + changed_properties = {k for k, v in new_properties.items() if v != old_properties.get(k)} | { + k for k, v in old_properties.items() if v != new_properties.get(k) + } # Gather all properties that represent port values device_ports = self.get_device_ports(friendly_name) @@ -797,7 +769,7 @@ async def _maybe_trigger_port_update(self, friendly_name: str, old_properties: d return attrdefs = await control_port.get_additional_attrdefs() - attr_properties = set(a['property_path'][0] for a in attrdefs.values() if a.get('property_path')) + attr_properties = {a["property_path"][0] for a in attrdefs.values() if a.get("property_path")} if not (attr_properties & changed_properties): return @@ -808,4 +780,4 @@ async def _maybe_trigger_port_update(self, friendly_name: str, old_properties: d control_port.save_asap() -from .ports import PermitJoinPort, DeviceControlPort, DevicePort # noqa: E402 +from .ports import DeviceControlPort, DevicePort, PermitJoinPort # noqa: E402 diff --git a/qtoggleserver/zigbee2mqtt/exceptions.py b/qtoggleserver/zigbee2mqtt/exceptions.py index 837400e..9b51d26 100644 --- a/qtoggleserver/zigbee2mqtt/exceptions.py +++ b/qtoggleserver/zigbee2mqtt/exceptions.py @@ -4,7 +4,7 @@ class Zigbee2MQTTException(Exception): class ClientNotConnected(Zigbee2MQTTException): def __init__(self) -> None: - super().__init__('MQTT client not connected') + super().__init__("MQTT client not connected") class RequestTimeout(Zigbee2MQTTException): diff --git a/qtoggleserver/zigbee2mqtt/ports.py b/qtoggleserver/zigbee2mqtt/ports.py index daa24e8..9d36d3b 100644 --- a/qtoggleserver/zigbee2mqtt/ports.py +++ b/qtoggleserver/zigbee2mqtt/ports.py @@ -2,11 +2,11 @@ import asyncio import copy -from typing import cast, Any, Optional +from typing import Any, cast -from qtoggleserver.peripherals import PeripheralPort from qtoggleserver.core import ports as core_ports from qtoggleserver.core.typing import Attribute, AttributeDefinitions, NullablePortValue, PortValue +from qtoggleserver.peripherals import PeripheralPort from qtoggleserver.utils import asyncio as asyncio_utils from .client import Zigbee2MQTTClient @@ -27,9 +27,8 @@ def get_peripheral(self) -> Zigbee2MQTTClient: return cast(Zigbee2MQTTClient, super().get_peripheral()) async def attr_is_online(self) -> bool: - return ( - await super().attr_is_online() and - self.get_peripheral().is_device_online(self.get_device_friendly_name()) + return await super().attr_is_online() and self.get_peripheral().is_device_online( + self.get_device_friendly_name() ) def get_device_friendly_name(self) -> str: @@ -53,9 +52,7 @@ async def set_state_property(self, property_path: list[str], value: Any) -> None assert len(property_path) > 0 for property_name in reversed(property_path): - value = { - property_name: value - } + value = {property_name: value} await self.get_peripheral().set_device_state(self.get_device_friendly_name(), value) @@ -74,18 +71,16 @@ async def set_config_property(self, property_path: list[str], value: Any) -> Non assert len(property_path) > 0 for property_name in reversed(property_path): - value = { - property_name: value - } + value = {property_name: value} await self.get_peripheral().set_device_config(self.get_device_friendly_name(), value) class PermitJoinPort(Zigbee2MQTTPort): - ID = 'permit_join' + ID = "permit_join" TYPE = core_ports.TYPE_BOOLEAN WRITABLE = True - DISPLAY_NAME = 'Permit Join' + DISPLAY_NAME = "Permit Join" PERSISTED = None async def read_value(self) -> NullablePortValue: @@ -101,18 +96,18 @@ def __init__( self, *, id: str, - display_name: str = '', + display_name: str = "", type: str, writable: bool, - unit: Optional[str] = None, - min: Optional[float] = None, - max: Optional[float] = None, + unit: str | None = None, + min: float | None = None, + max: float | None = None, property_path: list[str], storage: str, device_friendly_name: str, value_on: Any = True, value_off: Any = False, - values: Optional[list[str]] = None, + values: list[str] | None = None, **kwargs, ) -> None: super().__init__(id=id, device_friendly_name=device_friendly_name, **kwargs) @@ -121,35 +116,32 @@ def __init__( self._display_name: str = display_name self._type: str = type self._writable: bool = writable - self._unit: Optional[str] = unit - self._min: Optional[float] = min - self._max: Optional[float] = max + self._unit: str | None = unit + self._min: float | None = min + self._max: float | None = max self._property_path: list[str] = property_path self._storage: str = storage self._value_on: Any = value_on self._value_off: Any = value_off - self._values: Optional[list[str]] = values + self._values: list[str] | None = values if values: # This will determine the actual `choices` attribute value self._choices = [ - { - 'value': i + 1, - 'display_name': ' '.join(x.capitalize() for x in v.split('_')) - } + {"value": i + 1, "display_name": " ".join(x.capitalize() for x in v.split("_"))} for i, v in enumerate(values) ] self._values_dict: dict[str, int] = {v: i for i, v in enumerate(values)} async def read_value(self) -> NullablePortValue: - if self._storage == 'state': + if self._storage == "state": value = self.get_state_property(self.get_property_path()) else: value = self.get_config_property(self.get_property_path()) if await self.get_type() == core_ports.TYPE_BOOLEAN: - value = (value == self._value_on) + value = value == self._value_on elif self._values: # map Z2M value to choice try: value = self._values_dict[value] + 1 @@ -169,9 +161,9 @@ async def write_value(self, value: PortValue) -> None: try: value = self._values[int(value - 1)] except IndexError: - raise ValueError(f'Invalid choice: {value}') + raise ValueError(f"Invalid choice: {value}") - if self._storage == 'state': + if self._storage == "state": await self.set_state_property(self.get_property_path(), value) else: await self.set_config_property(self.get_property_path(), value) @@ -182,26 +174,26 @@ def get_property_path(self) -> list[str]: class DeviceControlPort(BaseDevicePort): ADDITIONAL_ATTRDEFS = { - 'friendly_name': { - 'display_name': 'Friendly Name', - 'description': 'Use this attribute to rename your Zigbee device. Set to empty string to remove the device.', - 'type': 'string', - 'modifiable': True, - 'persisted': False, + "friendly_name": { + "display_name": "Friendly Name", + "description": "Use this attribute to rename your Zigbee device. Set to empty string to remove the device.", + "type": "string", + "modifiable": True, + "persisted": False, + }, + "ieee_address": { + "display_name": "Address", + "description": "Zigbee IEEE Address", + "type": "string", + "modifiable": False, + "property_path": ["ieee_address"], + "storage": "config", }, - 'ieee_address': { - 'display_name': 'Address', - 'description': 'Zigbee IEEE Address', - 'type': 'string', - 'modifiable': False, - 'property_path': ['ieee_address'], - 'storage': 'config', - } } # Force `display_name` not persisted (by qToggleServer) as it's actually persisted by Z2M STANDARD_ATTRDEFS = copy.deepcopy(DevicePort.STANDARD_ATTRDEFS) - STANDARD_ATTRDEFS['display_name']['persisted'] = False + STANDARD_ATTRDEFS["display_name"]["persisted"] = False TYPE = core_ports.TYPE_BOOLEAN WRITABLE = True @@ -213,7 +205,7 @@ def __init__( self, *, id: str, - additional_attrdefs: Optional[AttributeDefinitions], + additional_attrdefs: AttributeDefinitions | None, device_friendly_name: str, **kwargs, ) -> None: @@ -225,7 +217,7 @@ async def get_additional_attrdefs(self) -> AttributeDefinitions: return self.ADDITIONAL_ATTRDEFS | self._additional_attrdefs async def enable_renamed_ports(self, enabled_port_ids: set[str], new_friendly_name: str, attempt: int = 1) -> None: - self.debug('enabling renamed ports: %s (attempt %d)', ', '.join(enabled_port_ids), attempt) + self.debug("enabling renamed ports: %s (attempt %d)", ", ".join(enabled_port_ids), attempt) ports: list[BaseDevicePort] = self.get_peripheral().get_device_ports(new_friendly_name) control_port = self.get_peripheral().get_control_port(new_friendly_name) @@ -234,11 +226,11 @@ async def enable_renamed_ports(self, enabled_port_ids: set[str], new_friendly_na if not ports: if attempt < self._MAX_RENAME_ATTEMPTS: - self.debug('renamed ports not added yet, retrying later') + self.debug("renamed ports not added yet, retrying later") await asyncio.sleep(1) await self.enable_renamed_ports(enabled_port_ids, new_friendly_name, attempt + 1) else: - self.error('timeout waiting for renamed ports to be added') + self.error("timeout waiting for renamed ports to be added") return for port in ports: @@ -266,8 +258,8 @@ async def attr_set_friendly_name(self, value: str) -> None: for port_id in all_enabled_port_ids: if port_id == current_safe_friendly_name: port_id = value - elif port_id.startswith(f'{current_safe_friendly_name}.'): - port_id = f'{value}.{port_id[len(current_safe_friendly_name) + 1:]}' + elif port_id.startswith(f"{current_safe_friendly_name}."): + port_id = f"{value}.{port_id[len(current_safe_friendly_name) + 1 :]}" else: continue device_enabled_port_ids.append(port_id) @@ -289,42 +281,42 @@ async def attr_set_friendly_name(self, value: str) -> None: asyncio_utils.await_later(1, self.get_peripheral().remove_device(current_friendly_name)) ) - async def attr_get_value(self, name: str) -> Optional[Attribute]: + async def attr_get_value(self, name: str) -> Attribute | None: attrdefs = await self.get_additional_attrdefs() attrdef = attrdefs.get(name) if not attrdef: return None - property_path = attrdef.get('property_path') + property_path = attrdef.get("property_path") if not property_path: return None # At this point we know this attribute is a Zigbee device property - if attrdef['storage'] == 'state': + if attrdef["storage"] == "state": value = self.get_state_property(property_path) else: value = self.get_config_property(property_path) if value is None: # Supply a default value if not found in state - if attrdef.get('type') == 'boolean': - value = attrdef.get('_value_off', False) - elif attrdef.get('_values'): # map Z2M value to choice - value = attrdef['_values'][0] # first value in enum - elif attrdef.get('min') is not None: - value = attrdef.get('min') - elif attrdef.get('max') is not None: - value = attrdef.get('max') + if attrdef.get("type") == "boolean": + value = attrdef.get("_value_off", False) + elif attrdef.get("_values"): # map Z2M value to choice + value = attrdef["_values"][0] # first value in enum + elif attrdef.get("min") is not None: + value = attrdef.get("min") + elif attrdef.get("max") is not None: + value = attrdef.get("max") else: value = 0 - if attrdef.get('type') == 'boolean': - value = (value == attrdef.get('_value_on', True)) - elif attrdef.get('_values'): # map Z2M value to choice + if attrdef.get("type") == "boolean": + value = value == attrdef.get("_value_on", True) + elif attrdef.get("_values"): # map Z2M value to choice try: - value = attrdef['_values_dict'][value] + 1 + value = attrdef["_values_dict"][value] + 1 except KeyError: - self.error('got an unexpected choice: %s', value) + self.error("got an unexpected choice: %s", value) value = None return value @@ -336,24 +328,24 @@ async def attr_set_value(self, name: str, value: Attribute) -> None: return # Ensure this is a Zigbee property - if not attrdef.get('property_path'): + if not attrdef.get("property_path"): return - if attrdef.get('type') == 'boolean': + if attrdef.get("type") == "boolean": if value: - value = attrdef.get('_value_on', True) + value = attrdef.get("_value_on", True) else: - value = attrdef.get('_value_off', False) - elif attrdef.get('_values'): # map choice to Z2M value + value = attrdef.get("_value_off", False) + elif attrdef.get("_values"): # map choice to Z2M value try: - value = attrdef.get('_values', [])[int(value - 1)] + value = attrdef.get("_values", [])[int(value - 1)] except IndexError: - raise ValueError(f'Invalid choice: {value}') + raise ValueError(f"Invalid choice: {value}") - if attrdef['storage'] == 'state': - await self.set_state_property(attrdef['property_path'], value) + if attrdef["storage"] == "state": + await self.set_state_property(attrdef["property_path"], value) else: - await self.set_config_property(attrdef['property_path'], value) + await self.set_config_property(attrdef["property_path"], value) async def read_value(self) -> NullablePortValue: return self.get_peripheral().is_device_enabled(self.get_device_friendly_name()) @@ -363,19 +355,19 @@ async def write_value(self, value: bool) -> None: await self.get_peripheral().set_device_enabled(self.get_device_friendly_name(), value) async def attr_get_display_name(self) -> str: - config_description = self.get_config_property(['description']) + config_description = self.get_config_property(["description"]) if config_description is not None: return config_description info = self.get_peripheral().get_device_info(self.get_device_friendly_name()) or {} - info_description = info.get('definition', {}).get('description') + info_description = info.get("definition", {}).get("description") if info_description is not None: return info_description - return '' + return "" async def attr_set_display_name(self, value: str) -> None: - await self.set_config_property(['description'], value) + await self.set_config_property(["description"], value) async def handle_enable(self) -> None: await super().handle_enable() diff --git a/setup.py b/setup.py deleted file mode 100644 index 6a43134..0000000 --- a/setup.py +++ /dev/null @@ -1,17 +0,0 @@ -from setuptools import setup, find_namespace_packages - - -setup( - name='qtoggleserver-zigbee2mqtt', - version='unknown-version', - description='qToggleServer integration with Zigbee2MQTT', - author='Calin Crisan', - author_email='ccrisan@gmail.com', - license='Apache 2.0', - - packages=find_namespace_packages(), - - install_requires=[ - 'aiomqtt>=2.0', - ] -)