|
| 1 | +"""Sunricher remote device.""" |
| 2 | + |
| 3 | +import logging |
| 4 | +from typing import Any, Final, NamedTuple, Optional, Union |
| 5 | + |
| 6 | +from zigpy.quirks.v2 import QuirkBuilder |
| 7 | +import zigpy.types as t |
| 8 | +from zigpy.zcl import foundation |
| 9 | + |
| 10 | +from zhaquirks import CustomCluster |
| 11 | +from zhaquirks.const import ( |
| 12 | + BUTTON, |
| 13 | + COMMAND, |
| 14 | + DOUBLE_PRESS, |
| 15 | + LONG_PRESS, |
| 16 | + LONG_RELEASE, |
| 17 | + PRESS_TYPE, |
| 18 | + SHORT_PRESS, |
| 19 | + ZHA_SEND_EVENT, |
| 20 | +) |
| 21 | + |
| 22 | +_LOGGER = logging.getLogger(__name__) |
| 23 | +SUNRICHER = "Sunricher" |
| 24 | + |
| 25 | + |
| 26 | +class Button(NamedTuple): |
| 27 | + """Button class.""" |
| 28 | + |
| 29 | + id: int |
| 30 | + action: str |
| 31 | + trigger: str |
| 32 | + |
| 33 | + |
| 34 | +class PressType(NamedTuple): |
| 35 | + """Button press type.""" |
| 36 | + |
| 37 | + name: str |
| 38 | + action: str |
| 39 | + trigger: str = None |
| 40 | + |
| 41 | + |
| 42 | +class ZG9002KR12ProRemoteCluster(CustomCluster): |
| 43 | + """Sunricher manufacturer specific cluster for remote.""" |
| 44 | + |
| 45 | + cluster_id: Final[t.uint16_t] = 0xFF03 |
| 46 | + name: Final = "Sunricher remote cluster" |
| 47 | + ep_attribute: Final = "sunricher_remote_cluster" |
| 48 | + |
| 49 | + # Button mapping |
| 50 | + BUTTONS: dict[int, Button] = { |
| 51 | + 1: Button(1, "k1", "K1"), |
| 52 | + 2: Button(2, "k2", "K2"), |
| 53 | + 3: Button(3, "k3", "K3"), |
| 54 | + 4: Button(4, "k4", "K4"), |
| 55 | + 5: Button(5, "k5", "K5"), |
| 56 | + 6: Button(6, "k6", "K6"), |
| 57 | + 7: Button(7, "k7", "K7"), |
| 58 | + 8: Button(8, "k8", "K8"), |
| 59 | + 9: Button(9, "knob", "Knob"), |
| 60 | + 11: Button(11, "k9", "K9"), |
| 61 | + 12: Button(12, "k10", "K10"), |
| 62 | + 15: Button(15, "k11", "K11"), |
| 63 | + 16: Button(16, "k12", "K12"), |
| 64 | + } |
| 65 | + |
| 66 | + # Press types |
| 67 | + PRESS_TYPES: dict[int, PressType] = { |
| 68 | + 1: PressType(SHORT_PRESS, "short_press", "Short Press"), |
| 69 | + 2: PressType(DOUBLE_PRESS, "double_press", "Double Press"), |
| 70 | + 3: PressType(LONG_PRESS, "hold", "Hold"), |
| 71 | + 4: PressType(LONG_RELEASE, "hold_released", "Hold Released"), |
| 72 | + } |
| 73 | + |
| 74 | + # Knob directions |
| 75 | + KNOB_DIRECTIONS: dict[int, PressType] = { |
| 76 | + 1: PressType("clockwise", "clockwise_rotation", "Clockwise Rotation"), |
| 77 | + 2: PressType( |
| 78 | + "anti_clockwise", "anti_clockwise_rotation", "Anti-clockwise Rotation" |
| 79 | + ), |
| 80 | + } |
| 81 | + |
| 82 | + def handle_cluster_request( |
| 83 | + self, |
| 84 | + hdr: foundation.ZCLHeader, |
| 85 | + args: list[Any], |
| 86 | + *, |
| 87 | + dst_addressing: Optional[ |
| 88 | + Union[t.Addressing.Group, t.Addressing.IEEE, t.Addressing.NWK] |
| 89 | + ] = None, |
| 90 | + ): |
| 91 | + """Handle the cluster command.""" |
| 92 | + |
| 93 | + message_type = args[0] |
| 94 | + if message_type == 0x01: |
| 95 | + # Button press event |
| 96 | + press_type_mask = args[3] |
| 97 | + button_mask = (args[1] << 8) | args[2] |
| 98 | + |
| 99 | + press_type = self.PRESS_TYPES.get(press_type_mask) |
| 100 | + action_buttons = [] |
| 101 | + for i in range(16): |
| 102 | + if (button_mask >> i) & 1: |
| 103 | + button_id = i + 1 |
| 104 | + button = self.BUTTONS.get(button_id) |
| 105 | + action_buttons.append(button) |
| 106 | + |
| 107 | + _LOGGER.debug( |
| 108 | + "Button event: action=%s, action_buttons=%s", |
| 109 | + press_type.action, |
| 110 | + [b.action for b in action_buttons], |
| 111 | + ) |
| 112 | + |
| 113 | + for button in action_buttons: |
| 114 | + action = f"{button.action}_{press_type.action}" |
| 115 | + event_data = { |
| 116 | + BUTTON: button.id, |
| 117 | + PRESS_TYPE: press_type.action, |
| 118 | + COMMAND: "button_press", |
| 119 | + } |
| 120 | + self.listener_event(ZHA_SEND_EVENT, action, event_data) |
| 121 | + |
| 122 | + elif message_type == 0x03: |
| 123 | + # Knob rotation event |
| 124 | + direction_mask = args[1] |
| 125 | + action_speed = args[3] |
| 126 | + direction = self.KNOB_DIRECTIONS.get(direction_mask) |
| 127 | + |
| 128 | + _LOGGER.debug( |
| 129 | + "Knob event: action=%s, action_speed=%s", direction.action, action_speed |
| 130 | + ) |
| 131 | + |
| 132 | + event_data = { |
| 133 | + BUTTON: 9, # knob |
| 134 | + PRESS_TYPE: direction.action, |
| 135 | + "speed": action_speed, |
| 136 | + } |
| 137 | + self.listener_event(ZHA_SEND_EVENT, direction.action, event_data) |
| 138 | + |
| 139 | + @classmethod |
| 140 | + def generate_device_automation_triggers(cls): |
| 141 | + """Generate automation triggers based on device buttons and press-types.""" |
| 142 | + triggers = {} |
| 143 | + # Generate button triggers |
| 144 | + for button in cls.BUTTONS.values(): |
| 145 | + for press_type in cls.PRESS_TYPES.values(): |
| 146 | + triggers[(press_type.trigger, button.trigger)] = { |
| 147 | + COMMAND: f"{button.action}_{press_type.action}" |
| 148 | + } |
| 149 | + |
| 150 | + # Generate knob triggers |
| 151 | + button = cls.BUTTONS.get(9) # knob |
| 152 | + if button: |
| 153 | + for direction in cls.KNOB_DIRECTIONS.values(): |
| 154 | + triggers[(direction.trigger, button.trigger)] = { |
| 155 | + COMMAND: direction.action |
| 156 | + } |
| 157 | + |
| 158 | + _LOGGER.debug("Generated triggers: %s", triggers) |
| 159 | + return triggers |
| 160 | + |
| 161 | + |
| 162 | +( |
| 163 | + QuirkBuilder(SUNRICHER, "HK-ZRC-K12&RS-E") |
| 164 | + .friendly_name( |
| 165 | + model="SR-ZG9002KR12-Pro", |
| 166 | + manufacturer=SUNRICHER, |
| 167 | + ) |
| 168 | + .replaces(ZG9002KR12ProRemoteCluster, endpoint_id=1) |
| 169 | + .device_automation_triggers( |
| 170 | + ZG9002KR12ProRemoteCluster.generate_device_automation_triggers() |
| 171 | + ) |
| 172 | + .add_to_registry() |
| 173 | +) |
0 commit comments