From 73e4012d7e047943d4b4b99071370a84adbabfce Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sun, 21 Nov 2021 22:26:24 -0500 Subject: [PATCH 1/4] Speed up bootloader skipping and support CC2530 --- tests/api/test_connect.py | 90 ++++++++++++++++++---- tests/api/test_request.py | 22 +++--- tests/api/test_response.py | 14 ++-- tests/application/test_connect.py | 7 +- tests/conftest.py | 18 ++--- tests/test_uart.py | 48 +----------- zigpy_znp/api.py | 122 ++++++++++++++++++++---------- zigpy_znp/uart.py | 52 +++++++------ zigpy_znp/utils.py | 11 +++ 9 files changed, 227 insertions(+), 157 deletions(-) diff --git a/tests/api/test_connect.py b/tests/api/test_connect.py index b47143dc..acb60cfc 100644 --- a/tests/api/test_connect.py +++ b/tests/api/test_connect.py @@ -2,40 +2,98 @@ import pytest +import zigpy_znp.types as t +import zigpy_znp.commands as c from zigpy_znp.api import ZNP -from ..conftest import FAKE_SERIAL_PORT, BaseServerZNP, config_for_port_path +from ..conftest import BaseServerZNP, CoroutineMock, config_for_port_path pytestmark = [pytest.mark.asyncio] -async def test_connect_no_communication(connected_znp): - znp, znp_server = connected_znp +async def test_connect_no_test(make_znp_server): + znp_server = make_znp_server(server_cls=BaseServerZNP) + znp = ZNP(config_for_port_path(znp_server.port_path)) + await znp.connect(test_port=False) + + # Nothing will be sent assert znp_server._uart.data_received.call_count == 0 + znp.close() + -async def test_connect_skip_bootloader(make_znp_server): +@pytest.mark.parametrize("work_after_attempt", [1, 2, 3]) +async def test_connect_skip_bootloader(make_znp_server, mocker, work_after_attempt): znp_server = make_znp_server(server_cls=BaseServerZNP) - znp = ZNP(config_for_port_path(FAKE_SERIAL_PORT)) + znp = ZNP(config_for_port_path(znp_server.port_path)) - await znp.connect(test_port=False) + mocker.patch.object(znp.nvram, "determine_alignment", new=CoroutineMock()) + mocker.patch.object(znp, "detect_zstack_version", new=CoroutineMock()) + + num_pings = 0 + + def ping_rsp(req): + nonlocal num_pings + num_pings += 1 + + # Ignore the first few pings + if num_pings >= work_after_attempt: + return c.SYS.Ping.Rsp(Capabilities=t.MTCapabilities.SYS) - # Nothing should have been sent except for bootloader skip bytes - # NOTE: `c[-2][0]` is `c.args[0]`, just compatible with all Python versions - data_written = b"".join(c[-2][0] for c in znp_server._uart.data_received.mock_calls) - assert set(data_written) == {0xEF} - assert len(data_written) >= 167 + znp_server.reply_to(c.SYS.Ping.Req(), responses=[ping_rsp]) + + await znp.connect(test_port=True) znp.close() -async def wait_for_spy(spy): - while True: - if spy.called: - return +async def test_connect_skip_bootloader_batched_rsp(make_znp_server, mocker): + znp_server = make_znp_server(server_cls=BaseServerZNP) + znp = ZNP(config_for_port_path(znp_server.port_path)) + + mocker.patch.object(znp.nvram, "determine_alignment", new=CoroutineMock()) + mocker.patch.object(znp, "detect_zstack_version", new=CoroutineMock()) + + num_pings = 0 + + def ping_rsp(req): + nonlocal num_pings + num_pings += 1 + + if num_pings == 3: + # CC253x radios sometimes buffer requests until they send a `ResetInd` + return ( + [ + c.SYS.ResetInd.Callback( + Reason=t.ResetReason.PowerUp, + TransportRev=0x00, + ProductId=0x12, + MajorRel=0x01, + MinorRel=0x02, + MaintRel=0x03, + ) + ] + + [c.SYS.Ping.Rsp(Capabilities=t.MTCapabilities.SYS)] * num_pings, + ) + elif num_pings >= 3: + return c.SYS.Ping.Rsp(Capabilities=t.MTCapabilities.SYS) + + znp_server.reply_to(c.SYS.Ping.Req(), responses=[ping_rsp]) + + await znp.connect(test_port=True) + + znp.close() + - await asyncio.sleep(0.01) +async def test_connect_skip_bootloader_failure(make_znp_server, mocker): + znp_server = make_znp_server(server_cls=BaseServerZNP) + znp = ZNP(config_for_port_path(znp_server.port_path)) + + with pytest.raises(asyncio.TimeoutError): + await znp.connect(test_port=True) + + znp.close() async def test_api_close(connected_znp, mocker): diff --git a/tests/api/test_request.py b/tests/api/test_request.py index cbf0a90d..debe472c 100644 --- a/tests/api/test_request.py +++ b/tests/api/test_request.py @@ -52,19 +52,19 @@ async def test_cleanup_timeout_internal(connected_znp): znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_SREQ_TIMEOUT] = 0.1 znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_ARSP_TIMEOUT] = 0.1 - assert not znp._listeners + assert not any(znp._listeners.values()) with pytest.raises(asyncio.TimeoutError): await znp.request(c.UTIL.TimeAlive.Req()) # We should be cleaned up - assert not znp._listeners + assert not any(znp._listeners.values()) async def test_cleanup_timeout_external(connected_znp): znp, znp_server = connected_znp - assert not znp._listeners + assert not any(znp._listeners.values()) # This request will timeout because we didn't send anything back with pytest.raises(asyncio.TimeoutError): @@ -72,13 +72,13 @@ async def test_cleanup_timeout_external(connected_znp): await znp.request(c.UTIL.TimeAlive.Req()) # We should be cleaned up - assert not znp._listeners + assert not any(znp._listeners.values()) async def test_callback_rsp_cleanup_timeout_external(connected_znp): znp, znp_server = connected_znp - assert not znp._listeners + assert not any(znp._listeners.values()) # This request will timeout because we didn't send anything back with pytest.raises(asyncio.TimeoutError): @@ -89,7 +89,7 @@ async def test_callback_rsp_cleanup_timeout_external(connected_znp): ) # We should be cleaned up - assert not znp._listeners + assert not any(znp._listeners.values()) @pytest.mark.parametrize("background", [False, True]) @@ -98,7 +98,7 @@ async def test_callback_rsp_cleanup_timeout_internal(background, connected_znp): znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_SREQ_TIMEOUT] = 0.1 znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_ARSP_TIMEOUT] = 0.1 - assert not znp._listeners + assert not any(znp._listeners.values()) # This request will timeout because we didn't send anything back with pytest.raises(asyncio.TimeoutError): @@ -109,7 +109,7 @@ async def test_callback_rsp_cleanup_timeout_internal(background, connected_znp): ) # We should be cleaned up - assert not znp._listeners + assert not any(znp._listeners.values()) async def test_callback_rsp_background_timeout(connected_znp, mocker): @@ -146,7 +146,7 @@ async def replier(req): await reply # We should be cleaned up - assert not znp._listeners + assert not any(znp._listeners.values()) # Command was properly handled assert len(znp._unhandled_command.mock_calls) == 0 @@ -157,7 +157,7 @@ async def test_callback_rsp_cleanup_concurrent(connected_znp, event_loop, mocker mocker.spy(znp, "_unhandled_command") - assert not znp._listeners + assert not any(znp._listeners.values()) def send_responses(): znp_server.send(c.UTIL.TimeAlive.Rsp(Seconds=123)) @@ -173,7 +173,7 @@ def send_responses(): ) # We should be cleaned up - assert not znp._listeners + assert not any(znp._listeners.values()) assert callback_rsp == c.SYS.OSALTimerExpired.Callback(Id=0xAB) diff --git a/tests/api/test_response.py b/tests/api/test_response.py index 52d97164..059f92a7 100644 --- a/tests/api/test_response.py +++ b/tests/api/test_response.py @@ -13,11 +13,11 @@ async def test_responses(connected_znp): znp, znp_server = connected_znp - assert not znp._listeners + assert not any(znp._listeners.values()) future = znp.wait_for_response(c.SYS.Ping.Rsp(partial=True)) - assert znp._listeners + assert any(znp._listeners.values()) response = c.SYS.Ping.Rsp(Capabilities=t.MTCapabilities.SYS) znp_server.send(response) @@ -26,13 +26,13 @@ async def test_responses(connected_znp): # Our listener will have been cleaned up after a step await asyncio.sleep(0) - assert not znp._listeners + assert not any(znp._listeners.values()) async def test_responses_multiple(connected_znp): znp, _ = connected_znp - assert not znp._listeners + assert not any(znp._listeners.values()) future1 = znp.wait_for_response(c.SYS.Ping.Rsp(partial=True)) future2 = znp.wait_for_response(c.SYS.Ping.Rsp(partial=True)) @@ -49,7 +49,7 @@ async def test_responses_multiple(connected_znp): assert not future2.done() assert not future3.done() - assert znp._listeners + assert any(znp._listeners.values()) async def test_response_timeouts(connected_znp): @@ -68,7 +68,7 @@ async def send_soon(delay): # The response was successfully received so we should have no outstanding listeners await asyncio.sleep(0) - assert not znp._listeners + assert not any(znp._listeners.values()) asyncio.create_task(send_soon(0.6)) @@ -80,7 +80,7 @@ async def send_soon(delay): # Our future still completed, albeit unsuccessfully. # We should have no leaked listeners here. - assert not znp._listeners + assert not any(znp._listeners.values()) async def test_response_matching_partial(connected_znp): diff --git a/tests/application/test_connect.py b/tests/application/test_connect.py index f7b8b051..cc1d140f 100644 --- a/tests/application/test_connect.py +++ b/tests/application/test_connect.py @@ -60,7 +60,7 @@ async def test_probe_unsuccessful(): @pytest.mark.parametrize("device", FORMED_DEVICES) async def test_probe_unsuccessful_slow(device, make_znp_server, mocker): - znp_server = make_znp_server(server_cls=device) + znp_server = make_znp_server(server_cls=device, shorten_delays=False) # Don't respond to anything znp_server._listeners.clear() @@ -78,7 +78,7 @@ async def test_probe_unsuccessful_slow(device, make_znp_server, mocker): @pytest.mark.parametrize("device", FORMED_DEVICES) async def test_probe_successful(device, make_znp_server): - znp_server = make_znp_server(server_cls=device) + znp_server = make_znp_server(server_cls=device, shorten_delays=False) assert await ControllerApplication.probe( conf.SCHEMA_DEVICE({conf.CONF_DEVICE_PATH: znp_server.serial_port}) @@ -89,7 +89,7 @@ async def test_probe_successful(device, make_znp_server): @pytest.mark.parametrize("device", FORMED_DEVICES) async def test_probe_multiple(device, make_znp_server): # Make sure that our listeners don't get cleaned up after each probe - znp_server = make_znp_server(server_cls=device) + znp_server = make_znp_server(server_cls=device, shorten_delays=False) znp_server.close = lambda: None config = conf.SCHEMA_DEVICE({conf.CONF_DEVICE_PATH: znp_server.serial_port}) @@ -112,6 +112,7 @@ async def test_reconnect(device, event_loop, make_application): conf.CONF_SREQ_TIMEOUT: 0.1, } }, + shorten_delays=False, ) # Start up the server diff --git a/tests/conftest.py b/tests/conftest.py index 20709b7b..12e5e011 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -81,14 +81,14 @@ def config_for_port_path(path): async def make_znp_server(mocker): transports = [] - mocker.patch("zigpy_znp.api.AFTER_CONNECT_DELAY", 0.001) - mocker.patch("zigpy_znp.api.STARTUP_DELAY", 0.001) - mocker.patch("zigpy_znp.uart.RTS_TOGGLE_DELAY", 0) - - def inner(server_cls, config=None): + def inner(server_cls, config=None, shorten_delays=True): if config is None: config = config_for_port_path(FAKE_SERIAL_PORT) + if shorten_delays: + mocker.patch("zigpy_znp.api.AFTER_BOOTLOADER_SKIP_BYTE_DELAY", 0.001) + mocker.patch("zigpy_znp.api.RTS_TOGGLE_DELAY", 0.001) + server = server_cls(config) server._transports = transports @@ -154,8 +154,6 @@ async def inner(server_cls): } ) - mocker.patch("zigpy_znp.api.STARTUP_DELAY", 0) - znp = ZNP(config) znp_server = make_znp_server(server_cls=server_cls) @@ -218,7 +216,7 @@ def swap_attribute(obj, name, value): @pytest.fixture def make_application(make_znp_server): - def inner(server_cls, client_config=None, server_config=None): + def inner(server_cls, client_config=None, server_config=None, **kwargs): default = config_for_port_path(FAKE_SERIAL_PORT) client_config = merge_dicts(default, client_config or {}) @@ -255,7 +253,9 @@ def add_initialized_device(self, *args, **kwargs): app.add_initialized_device = add_initialized_device.__get__(app) - return app, make_znp_server(server_cls=server_cls, config=server_config) + return app, make_znp_server( + server_cls=server_cls, config=server_config, **kwargs + ) return inner diff --git a/tests/test_uart.py b/tests/test_uart.py index 8568474c..922d7c36 100644 --- a/tests/test_uart.py +++ b/tests/test_uart.py @@ -230,7 +230,7 @@ async def test_connection_lost(dummy_serial_conn, mocker, event_loop): znp.connection_lost = conn_lost_fut.set_result protocol = await znp_uart.connect( - conf.SCHEMA_DEVICE({conf.CONF_DEVICE_PATH: device}), api=znp, toggle_rts=False + conf.SCHEMA_DEVICE({conf.CONF_DEVICE_PATH: device}), api=znp ) exception = RuntimeError("Uh oh, something broke") @@ -245,50 +245,6 @@ async def test_connection_made(dummy_serial_conn, mocker): device, _ = dummy_serial_conn znp = mocker.Mock() - await znp_uart.connect( - conf.SCHEMA_DEVICE({conf.CONF_DEVICE_PATH: device}), api=znp, toggle_rts=False - ) + await znp_uart.connect(conf.SCHEMA_DEVICE({conf.CONF_DEVICE_PATH: device}), api=znp) znp.connection_made.assert_called_once_with() - - -@pytest.mark.asyncio -async def test_no_toggle_rts(dummy_serial_conn, mocker): - device, serial = dummy_serial_conn - - type(serial).dtr = dtr = mocker.PropertyMock() - type(serial).rts = rts = mocker.PropertyMock() - - znp = mocker.Mock() - - await znp_uart.connect( - conf.SCHEMA_DEVICE({conf.CONF_DEVICE_PATH: device}), api=znp, toggle_rts=False - ) - - assert dtr.mock_calls == [] - assert rts.mock_calls == [] - - -@pytest.mark.asyncio -async def test_toggle_rts(dummy_serial_conn, mocker): - device, serial = dummy_serial_conn - - type(serial).dtr = dtr = mocker.PropertyMock() - type(serial).rts = rts = mocker.PropertyMock() - - znp = mocker.Mock() - - await znp_uart.connect( - conf.SCHEMA_DEVICE({conf.CONF_DEVICE_PATH: device}), api=znp, toggle_rts=True - ) - - assert dtr.mock_calls == [ - mocker.call(False), - mocker.call(False), - mocker.call(False), - ] - assert rts.mock_calls == [ - mocker.call(False), - mocker.call(True), - mocker.call(False), - ] diff --git a/zigpy_znp/api.py b/zigpy_znp/api.py index 6135b5c1..30897f77 100644 --- a/zigpy_znp/api.py +++ b/zigpy_znp/api.py @@ -21,6 +21,7 @@ from zigpy_znp import uart from zigpy_znp.nvram import NVRAMHelper from zigpy_znp.utils import ( + CatchAllResponse, BaseResponseListener, OneShotResponseListener, CallbackResponseListener, @@ -31,10 +32,13 @@ LOGGER = logging.getLogger(__name__) + # All of these are in seconds -AFTER_CONNECT_DELAY = 1 -STARTUP_DELAY = 1 +AFTER_BOOTLOADER_SKIP_BYTE_DELAY = 2.5 NETWORK_COMMISSIONING_TIMEOUT = 30 +RTS_TOGGLE_DELAY = 0.15 +CONNECT_PING_TIMEOUT = 0.50 +CONNECT_PROBE_TIMEOUT = 5.0 class ZNP: @@ -454,6 +458,73 @@ async def reset(self) -> None: def _port_path(self) -> str: return self._config[conf.CONF_DEVICE][conf.CONF_DEVICE_PATH] + async def _skip_bootloader(self) -> c.SYS.Ping.Rsp: + """ + Attempt to skip the bootloader and return the ping response. + """ + + async def ping_task(): + # First, just try pinging + try: + async with async_timeout.timeout(CONNECT_PING_TIMEOUT): + return await self.request(c.SYS.Ping.Req()) + except asyncio.TimeoutError: + pass + + # If that doesn't work, send the bootloader skip bytes and try again + self._uart.write(256 * bytes([c.ubl.BootloaderRunMode.FORCE_RUN])) + + await asyncio.sleep(AFTER_BOOTLOADER_SKIP_BYTE_DELAY) + + try: + async with async_timeout.timeout(CONNECT_PING_TIMEOUT): + return await self.request(c.SYS.Ping.Req()) + except asyncio.TimeoutError: + pass + + # Finally, toggle the RTS pin to skip Slaesh's bootloader + LOGGER.debug("Toggling RTS pin to skip CC2652R bootloader") + + self._uart.set_dtr_rts(dtr=False, rts=False) + await asyncio.sleep(RTS_TOGGLE_DELAY) + self._uart.set_dtr_rts(dtr=False, rts=True) + await asyncio.sleep(RTS_TOGGLE_DELAY) + self._uart.set_dtr_rts(dtr=False, rts=False) + await asyncio.sleep(RTS_TOGGLE_DELAY) + + # At this point we have nothing else to try, don't catch the timeout + async with async_timeout.timeout(CONNECT_PING_TIMEOUT): + return await self.request(c.SYS.Ping.Req()) + + async with self.capture_responses([CatchAllResponse()]) as responses: + ping_task = asyncio.create_task(ping_task()) + + try: + async with async_timeout.timeout(CONNECT_PROBE_TIMEOUT): + result = await responses.get() + except Exception: + ping_task.cancel() + raise + else: + LOGGER.debug("Radio is alive: %s", result) + + # Give the ping task a little bit extra time to finish. Radios often queue + # requests so when the reset indication is received, they will all be + # immediately answered + if not ping_task.done(): + LOGGER.debug("Giving ping task %0.2fs to finish", CONNECT_PING_TIMEOUT) + + try: + async with async_timeout.timeout(CONNECT_PING_TIMEOUT): + result = await ping_task + except asyncio.TimeoutError: + ping_task.cancel() + + if isinstance(result, c.SYS.Ping.Rsp): + return result + + return await self.request(c.SYS.Ping.Req()) + async def connect(self, *, test_port=True) -> None: """ Connects to the device specified by the "device" section of the config dict. @@ -468,40 +539,11 @@ async def connect(self, *, test_port=True) -> None: try: self._uart = await uart.connect(self._config[conf.CONF_DEVICE], self) - LOGGER.debug("Waiting %ss before sending anything", AFTER_CONNECT_DELAY) - await asyncio.sleep(AFTER_CONNECT_DELAY) - - if self._config[conf.CONF_ZNP_CONFIG][conf.CONF_SKIP_BOOTLOADER]: - LOGGER.debug("Sending bootloader skip byte") - - # XXX: Z-Stack locks up if other radios try probing it first. - # Writing the bootloader skip byte a bunch of times (at least 167) - # appears to reset it. - skip = bytes([c.ubl.BootloaderRunMode.FORCE_RUN]) - self._uart._transport_write(skip * 256) - - # We have to disable all non-bootloader commands to enter the serial - # bootloader upon connecting to the UART. + # To allow the ZNP interface to be used for bootloader commands, we have to + # prevent any data from being sent if test_port: - # Some Z-Stack 3 devices don't like you sending data immediately after - # opening the serial port. A small delay helps, but they also sometimes - # send a reset indication message when they're ready. - LOGGER.debug( - "Waiting %ss or until a reset indication is received", STARTUP_DELAY - ) - - try: - async with async_timeout.timeout(STARTUP_DELAY): - await self.wait_for_response( - c.SYS.ResetInd.Callback(partial=True) - ) - except asyncio.TimeoutError: - pass - - LOGGER.debug("Testing connection to %s", self._port_path) - - # Make sure that our port works - self.capabilities = (await self.request(c.SYS.Ping.Req())).Capabilities + # The reset indication callback is sent when some sticks start up + self.capabilities = (await self._skip_bootloader()).Capabilities # We need to know how structs are packed to deserialize frames correctly await self.nvram.determine_alignment() @@ -513,11 +555,7 @@ async def connect(self, *, test_port=True) -> None: self.close() raise - LOGGER.debug( - "Connected to %s at %s baud", - self._uart._transport.serial.name, - self._uart._transport.serial.baudrate, - ) + LOGGER.debug("Connected to %s at %s baud", self._uart.name, self._uart.baudrate) def connection_made(self) -> None: """ @@ -625,7 +663,9 @@ def frame_received(self, frame: GeneralFrame) -> bool: matched = False one_shot_matched = False - for listener in self._listeners[command.header]: + for listener in ( + self._listeners[command.header] + self._listeners[CatchAllResponse.header] + ): # XXX: A single response should *not* resolve multiple one-shot listeners! # `future.add_done_callback` doesn't remove our listeners synchronously # so doesn't prevent this from happening. diff --git a/zigpy_znp/uart.py b/zigpy_znp/uart.py index 0d4bf8af..4eb97faf 100644 --- a/zigpy_znp/uart.py +++ b/zigpy_znp/uart.py @@ -22,7 +22,6 @@ LOGGER = logging.getLogger(__name__) -RTS_TOGGLE_DELAY = 0.15 # seconds class BufferTooShort(Exception): @@ -87,12 +86,29 @@ def data_received(self, data: bytes) -> None: def send(self, payload: frames.GeneralFrame) -> None: """Sends data taking care of framing.""" - self._transport_write(frames.TransportFrame(payload).serialize()) + self.write(frames.TransportFrame(payload).serialize()) + + def write(self, data: bytes) -> None: + """ + Writes raw bytes to the transport. This method should be used instead of + directly writing to the transport with `transport.write`. + """ - def _transport_write(self, data: bytes) -> None: LOGGER.log(log.TRACE, "Sending data: %s", Bytes.__repr__(data)) self._transport.write(data) + def set_dtr_rts(self, *, dtr: bool, rts: bool) -> None: + self._transport.serial.dtr = dtr + self._transport.serial.rts = rts + + @property + def name(self) -> str: + return self._transport.serial.name + + @property + def baudrate(self) -> int: + return self._transport.serial.baudrate + def _extract_frames(self) -> typing.Iterator[frames.TransportFrame]: """Extracts frames from the buffer until it is exhausted.""" while True: @@ -143,10 +159,16 @@ def _extract_frame(self) -> frames.TransportFrame: return frame def __repr__(self) -> str: - return f"<{type(self).__name__} for {self._api}>" + return ( + f"<" + f"{type(self).__name__} connected to {self.name!r}" + f" at {self.baudrate} baud" + f" (api: {self._api})" + f">" + ) -async def connect(config: conf.ConfigType, api, *, toggle_rts=True) -> ZnpMtProtocol: +async def connect(config: conf.ConfigType, api) -> ZnpMtProtocol: loop = asyncio.get_running_loop() port = config[conf.CONF_DEVICE_PATH] @@ -155,7 +177,7 @@ async def connect(config: conf.ConfigType, api, *, toggle_rts=True) -> ZnpMtProt LOGGER.debug("Connecting to %s at %s baud", port, baudrate) - transport, protocol = await serial_asyncio.create_serial_connection( + _, protocol = await serial_asyncio.create_serial_connection( loop=loop, protocol_factory=lambda: ZnpMtProtocol(api), url=port, @@ -168,24 +190,6 @@ async def connect(config: conf.ConfigType, api, *, toggle_rts=True) -> ZnpMtProt await protocol._connected_event.wait() - # Skips the bootloader on slaesh's CC2652R USB stick - if toggle_rts: - LOGGER.debug("Toggling RTS/CTS to skip CC2652R bootloader") - transport.serial.dtr = False - transport.serial.rts = False - - await asyncio.sleep(RTS_TOGGLE_DELAY) - - transport.serial.dtr = False - transport.serial.rts = True - - await asyncio.sleep(RTS_TOGGLE_DELAY) - - transport.serial.dtr = False - transport.serial.rts = False - - await asyncio.sleep(RTS_TOGGLE_DELAY) - LOGGER.debug("Connected to %s at %s baud", port, baudrate) return protocol diff --git a/zigpy_znp/utils.py b/zigpy_znp/utils.py index 66447c89..d0e27c15 100644 --- a/zigpy_znp/utils.py +++ b/zigpy_znp/utils.py @@ -153,6 +153,17 @@ def cancel(self): return False +class CatchAllResponse: + """ + Response-like object that matches every request. + """ + + header = object() # sentinel + + def matches(self, other) -> bool: + return True + + def combine_concurrent_calls(function): """ Decorator that allows concurrent calls to expensive coroutines to share a result. From 537b4a444721a9137be9fed9e23d0acc009def36 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Thu, 25 Nov 2021 17:02:25 -0500 Subject: [PATCH 2/4] Make RTS and DTR pin states configurable --- README.md | 4 ++++ tests/api/test_connect.py | 31 ++++++++++++++++++++++++++++++- tests/conftest.py | 14 ++++++++------ zigpy_znp/api.py | 33 +++++++++++++++++++-------------- zigpy_znp/config.py | 25 ++++++++++++++++++++++++- zigpy_znp/uart.py | 2 ++ 6 files changed, 87 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index d147d9fa..17e5927a 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,10 @@ zha: # Delay between auto-reconnect attempts in case the device gets disconnected auto_reconnect_retry_delay: 5 + + # Pin states for skipping the bootloader + connect_rts_pin_states: [off, on, off] + connect_dtr_pin_states: [off, off, off] ``` # Tools diff --git a/tests/api/test_connect.py b/tests/api/test_connect.py index acb60cfc..2f30817a 100644 --- a/tests/api/test_connect.py +++ b/tests/api/test_connect.py @@ -1,4 +1,5 @@ import asyncio +from unittest.mock import call import pytest @@ -86,7 +87,7 @@ def ping_rsp(req): znp.close() -async def test_connect_skip_bootloader_failure(make_znp_server, mocker): +async def test_connect_skip_bootloader_failure(make_znp_server): znp_server = make_znp_server(server_cls=BaseServerZNP) znp = ZNP(config_for_port_path(znp_server.port_path)) @@ -96,6 +97,34 @@ async def test_connect_skip_bootloader_failure(make_znp_server, mocker): znp.close() +async def test_connect_skip_bootloader_rts_dtr_pins(make_znp_server, mocker): + znp_server = make_znp_server(server_cls=BaseServerZNP) + znp = ZNP(config_for_port_path(znp_server.port_path)) + + mocker.patch.object(znp.nvram, "determine_alignment", new=CoroutineMock()) + mocker.patch.object(znp, "detect_zstack_version", new=CoroutineMock()) + + num_pings = 0 + + def ping_rsp(req): + nonlocal num_pings + num_pings += 1 + + if num_pings >= 3: + return c.SYS.Ping.Rsp(Capabilities=t.MTCapabilities.SYS) + + # Ignore the first few pings so we trigger the pin toggling + znp_server.reply_to(c.SYS.Ping.Req(), responses=ping_rsp) + + await znp.connect(test_port=True) + + serial = znp._uart._transport + assert serial._mock_dtr_prop.mock_calls == [call(False), call(False), call(False)] + assert serial._mock_rts_prop.mock_calls == [call(False), call(True), call(False)] + + znp.close() + + async def test_api_close(connected_znp, mocker): znp, znp_server = connected_znp uart = znp._uart diff --git a/tests/conftest.py b/tests/conftest.py index 12e5e011..0b556acb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,7 @@ import logging import pathlib import contextlib +from unittest.mock import Mock, PropertyMock import pytest import zigpy.device @@ -37,16 +38,17 @@ class ForwardingSerialTransport: Serial transport that hooks directly into a protocol """ - class serial: - # so the transport has a `serial` attribute - name = FAKE_SERIAL_PORT - baudrate = 45678 - def __init__(self, protocol): self.protocol = protocol self._is_connected = False self.other = None + self.serial = Mock() + self.serial.name = FAKE_SERIAL_PORT + self.serial.baudrate = 45678 + type(self.serial).dtr = self._mock_dtr_prop = PropertyMock(return_value=None) + type(self.serial).rts = self._mock_rts_prop = PropertyMock(return_value=None) + def _connect(self): assert not self._is_connected self._is_connected = True @@ -87,7 +89,7 @@ def inner(server_cls, config=None, shorten_delays=True): if shorten_delays: mocker.patch("zigpy_znp.api.AFTER_BOOTLOADER_SKIP_BYTE_DELAY", 0.001) - mocker.patch("zigpy_znp.api.RTS_TOGGLE_DELAY", 0.001) + mocker.patch("zigpy_znp.api.BOOTLOADER_PIN_TOGGLE_DELAY", 0.001) server = server_cls(config) server._transports = transports diff --git a/zigpy_znp/api.py b/zigpy_znp/api.py index 30897f77..19599027 100644 --- a/zigpy_znp/api.py +++ b/zigpy_znp/api.py @@ -36,7 +36,7 @@ # All of these are in seconds AFTER_BOOTLOADER_SKIP_BYTE_DELAY = 2.5 NETWORK_COMMISSIONING_TIMEOUT = 30 -RTS_TOGGLE_DELAY = 0.15 +BOOTLOADER_PIN_TOGGLE_DELAY = 0.15 CONNECT_PING_TIMEOUT = 0.50 CONNECT_PROBE_TIMEOUT = 5.0 @@ -458,6 +458,10 @@ async def reset(self) -> None: def _port_path(self) -> str: return self._config[conf.CONF_DEVICE][conf.CONF_DEVICE_PATH] + @property + def _znp_config(self) -> conf.ConfigType: + return self._config[conf.CONF_ZNP_CONFIG] + async def _skip_bootloader(self) -> c.SYS.Ping.Rsp: """ Attempt to skip the bootloader and return the ping response. @@ -471,7 +475,9 @@ async def ping_task(): except asyncio.TimeoutError: pass - # If that doesn't work, send the bootloader skip bytes and try again + # If that doesn't work, send the bootloader skip bytes and try again. + # Sending a bunch at a time fixes UART issues when the radio was previously + # probed with another library at a different baudrate. self._uart.write(256 * bytes([c.ubl.BootloaderRunMode.FORCE_RUN])) await asyncio.sleep(AFTER_BOOTLOADER_SKIP_BYTE_DELAY) @@ -482,15 +488,16 @@ async def ping_task(): except asyncio.TimeoutError: pass - # Finally, toggle the RTS pin to skip Slaesh's bootloader - LOGGER.debug("Toggling RTS pin to skip CC2652R bootloader") + # This is normally done just for Slaesh's CC2652RB stick + LOGGER.debug("Toggling RTS/DTR pins to skip bootloader or reset chip") - self._uart.set_dtr_rts(dtr=False, rts=False) - await asyncio.sleep(RTS_TOGGLE_DELAY) - self._uart.set_dtr_rts(dtr=False, rts=True) - await asyncio.sleep(RTS_TOGGLE_DELAY) - self._uart.set_dtr_rts(dtr=False, rts=False) - await asyncio.sleep(RTS_TOGGLE_DELAY) + # The default sequence is DTR=false and RTS toggling false/true/false + for dtr, rts in zip( + self._znp_config[conf.CONF_CONNECT_DTR_STATES], + self._znp_config[conf.CONF_CONNECT_RTS_STATES], + ): + self._uart.set_dtr_rts(dtr=dtr, rts=rts) + await asyncio.sleep(BOOTLOADER_PIN_TOGGLE_DELAY) # At this point we have nothing else to try, don't catch the timeout async with async_timeout.timeout(CONNECT_PING_TIMEOUT): @@ -816,9 +823,7 @@ async def request(self, request: t.CommandBase, **response_params) -> t.CommandB self._uart.send(frame) # We should get a SRSP in a reasonable amount of time - async with async_timeout.timeout( - self._config[conf.CONF_ZNP_CONFIG][conf.CONF_SREQ_TIMEOUT] - ): + async with async_timeout.timeout(self._znp_config[conf.CONF_SREQ_TIMEOUT]): # We lock until either a sync response is seen or an error occurs response = await response_future @@ -851,7 +856,7 @@ async def request_callback_rsp( # Every request should have a timeout to prevent deadlocks if timeout is None: - timeout = self._config[conf.CONF_ZNP_CONFIG][conf.CONF_ARSP_TIMEOUT] + timeout = self._znp_config[conf.CONF_ARSP_TIMEOUT] callback_rsp, listener = self.wait_for_responses([callback], context=True) diff --git a/zigpy_znp/config.py b/zigpy_znp/config.py index d1c4faf1..cb197902 100644 --- a/zigpy_znp/config.py +++ b/zigpy_znp/config.py @@ -48,6 +48,20 @@ def validator(v): return validator +def keys_have_same_length(*keys): + def validator(config): + lengths = [len(config[k]) for k in keys] + + if len(set(lengths)) != 1: + raise vol.Invalid( + f"Values for {keys} must all have the same length: {lengths}" + ) + + return config + + return validator + + CONF_ZNP_CONFIG = "znp_config" CONF_TX_POWER = "tx_power" CONF_LED_MODE = "led_mode" @@ -56,6 +70,8 @@ def validator(v): CONF_ARSP_TIMEOUT = "async_response_timeout" CONF_AUTO_RECONNECT_RETRY_DELAY = "auto_reconnect_retry_delay" CONF_MAX_CONCURRENT_REQUESTS = "max_concurrent_requests" +CONF_CONNECT_RTS_STATES = "connect_rts_pin_states" +CONF_CONNECT_DTR_STATES = "connect_dtr_pin_states" CONFIG_SCHEMA = CONFIG_SCHEMA.extend( { @@ -77,7 +93,14 @@ def validator(v): vol.Optional(CONF_MAX_CONCURRENT_REQUESTS, default="auto"): vol.Any( "auto", VolPositiveNumber ), - } + vol.Optional( + CONF_CONNECT_RTS_STATES, default=[False, True, False] + ): vol.Schema([cv_boolean]), + vol.Optional( + CONF_CONNECT_DTR_STATES, default=[False, False, False] + ): vol.Schema([cv_boolean]), + }, + keys_have_same_length(CONF_CONNECT_RTS_STATES, CONF_CONNECT_DTR_STATES), ), } ) diff --git a/zigpy_znp/uart.py b/zigpy_znp/uart.py index 4eb97faf..5571e60d 100644 --- a/zigpy_znp/uart.py +++ b/zigpy_znp/uart.py @@ -98,6 +98,8 @@ def write(self, data: bytes) -> None: self._transport.write(data) def set_dtr_rts(self, *, dtr: bool, rts: bool) -> None: + LOGGER.debug("Setting serial pin states: DTR=%s, RTS=%s", dtr, rts) + self._transport.serial.dtr = dtr self._transport.serial.rts = rts From 60e7975c171b82faef96ae511de46a289df186e9 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Thu, 25 Nov 2021 17:02:25 -0500 Subject: [PATCH 3/4] Replace per-test `pytestmark` with a global one in `conftest` --- tests/api/test_connect.py | 2 -- tests/api/test_listeners.py | 2 -- tests/api/test_network_state.py | 2 -- tests/api/test_nvram.py | 2 -- tests/api/test_request.py | 2 -- tests/api/test_response.py | 2 -- tests/application/test_connect.py | 2 -- tests/application/test_joining.py | 2 -- tests/application/test_nvram_migration.py | 2 -- tests/application/test_requests.py | 2 -- tests/application/test_startup.py | 2 -- tests/application/test_zigpy_callbacks.py | 2 -- tests/conftest.py | 10 ++++++++++ tests/test_uart.py | 2 -- tests/test_utils.py | 1 - tests/tools/test_energy_scan.py | 2 -- tests/tools/test_flash.py | 3 --- tests/tools/test_form_network.py | 1 - tests/tools/test_network_backup_restore.py | 8 -------- tests/tools/test_network_scan.py | 2 -- tests/tools/test_nvram.py | 2 -- 21 files changed, 10 insertions(+), 45 deletions(-) diff --git a/tests/api/test_connect.py b/tests/api/test_connect.py index 2f30817a..c7168aab 100644 --- a/tests/api/test_connect.py +++ b/tests/api/test_connect.py @@ -9,8 +9,6 @@ from ..conftest import BaseServerZNP, CoroutineMock, config_for_port_path -pytestmark = [pytest.mark.asyncio] - async def test_connect_no_test(make_znp_server): znp_server = make_znp_server(server_cls=BaseServerZNP) diff --git a/tests/api/test_listeners.py b/tests/api/test_listeners.py index 98467494..0485d211 100644 --- a/tests/api/test_listeners.py +++ b/tests/api/test_listeners.py @@ -7,8 +7,6 @@ import zigpy_znp.commands as c from zigpy_znp.api import OneShotResponseListener, CallbackResponseListener -pytestmark = [pytest.mark.asyncio] - async def test_resolve(event_loop, mocker): callback = mocker.Mock() diff --git a/tests/api/test_network_state.py b/tests/api/test_network_state.py index ca12b6a7..62e712e6 100644 --- a/tests/api/test_network_state.py +++ b/tests/api/test_network_state.py @@ -4,8 +4,6 @@ from ..conftest import ALL_DEVICES, FORMED_DEVICES, BaseZStack1CC2531 -pytestmark = [pytest.mark.asyncio] - @pytest.mark.parametrize("to_device", ALL_DEVICES) @pytest.mark.parametrize("from_device", FORMED_DEVICES) diff --git a/tests/api/test_nvram.py b/tests/api/test_nvram.py index 5695cd16..38b1adfe 100644 --- a/tests/api/test_nvram.py +++ b/tests/api/test_nvram.py @@ -5,8 +5,6 @@ from zigpy_znp.types import nvids from zigpy_znp.exceptions import SecurityError -pytestmark = [pytest.mark.asyncio] - async def test_osal_writes_invalid(connected_znp): znp, _ = connected_znp diff --git a/tests/api/test_request.py b/tests/api/test_request.py index debe472c..4127c618 100644 --- a/tests/api/test_request.py +++ b/tests/api/test_request.py @@ -10,8 +10,6 @@ from zigpy_znp.frames import GeneralFrame from zigpy_znp.exceptions import CommandNotRecognized, InvalidCommandResponse -pytestmark = [pytest.mark.asyncio] - async def test_callback_rsp(connected_znp, event_loop): znp, znp_server = connected_znp diff --git a/tests/api/test_response.py b/tests/api/test_response.py index 059f92a7..264ff3ef 100644 --- a/tests/api/test_response.py +++ b/tests/api/test_response.py @@ -7,8 +7,6 @@ import zigpy_znp.commands as c from zigpy_znp.utils import deduplicate_commands -pytestmark = [pytest.mark.asyncio] - async def test_responses(connected_znp): znp, znp_server = connected_znp diff --git a/tests/application/test_connect.py b/tests/application/test_connect.py index cc1d140f..238a226e 100644 --- a/tests/application/test_connect.py +++ b/tests/application/test_connect.py @@ -8,8 +8,6 @@ from ..conftest import FORMED_DEVICES, FormedLaunchpadCC26X2R1, swap_attribute -pytestmark = [pytest.mark.asyncio] - async def test_no_double_connect(make_znp_server, mocker): znp_server = make_znp_server(server_cls=FormedLaunchpadCC26X2R1) diff --git a/tests/application/test_joining.py b/tests/application/test_joining.py index 4488213f..979d0883 100644 --- a/tests/application/test_joining.py +++ b/tests/application/test_joining.py @@ -16,8 +16,6 @@ FormedLaunchpadCC26X2R1, ) -pytestmark = [pytest.mark.asyncio] - @pytest.mark.parametrize( "device,fixed_joining_bug", diff --git a/tests/application/test_nvram_migration.py b/tests/application/test_nvram_migration.py index a09973a8..421d92d1 100644 --- a/tests/application/test_nvram_migration.py +++ b/tests/application/test_nvram_migration.py @@ -7,8 +7,6 @@ from ..conftest import FORMED_DEVICES, FormedZStack3CC2531 -pytestmark = [pytest.mark.asyncio] - @pytest.mark.parametrize("device", FORMED_DEVICES) async def test_addrmgr_empty_entries(make_connected_znp, device): diff --git a/tests/application/test_requests.py b/tests/application/test_requests.py index caca9b2c..f7083b9e 100644 --- a/tests/application/test_requests.py +++ b/tests/application/test_requests.py @@ -13,8 +13,6 @@ from ..conftest import FORMED_DEVICES, CoroutineMock, FormedLaunchpadCC26X2R1 -pytestmark = [pytest.mark.asyncio] - @pytest.mark.parametrize("device", FORMED_DEVICES) async def test_zdo_request_interception(device, make_application): diff --git a/tests/application/test_startup.py b/tests/application/test_startup.py index 88f5c12d..bd0b0edd 100644 --- a/tests/application/test_startup.py +++ b/tests/application/test_startup.py @@ -18,8 +18,6 @@ FormedLaunchpadCC26X2R1, ) -pytestmark = [pytest.mark.asyncio] - DEV_NETWORK_SETTINGS = { FormedLaunchpadCC26X2R1: ( "CC1352/CC2652, Z-Stack 3.30+ (build 20200805)", diff --git a/tests/application/test_zigpy_callbacks.py b/tests/application/test_zigpy_callbacks.py index 2f8fb760..4db159ff 100644 --- a/tests/application/test_zigpy_callbacks.py +++ b/tests/application/test_zigpy_callbacks.py @@ -9,8 +9,6 @@ from ..conftest import FORMED_DEVICES, CoroutineMock -pytestmark = [pytest.mark.asyncio] - def awaitable_mock(return_value): mock_called = asyncio.get_running_loop().create_future() diff --git a/tests/conftest.py b/tests/conftest.py index 0b556acb..6dc2c4fe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -33,6 +33,16 @@ FAKE_SERIAL_PORT = "/dev/ttyFAKE0" +# Globally handle async tests and error on unawaited coroutines +def pytest_collection_modifyitems(session, config, items): + for item in items: + item.add_marker( + pytest.mark.filterwarnings("error::pytest.PytestUnraisableExceptionWarning") + ) + item.add_marker(pytest.mark.filterwarnings("error::RuntimeWarning")) + item.add_marker(pytest.mark.asyncio) + + class ForwardingSerialTransport: """ Serial transport that hooks directly into a protocol diff --git a/tests/test_uart.py b/tests/test_uart.py index 922d7c36..3cfc6e2b 100644 --- a/tests/test_uart.py +++ b/tests/test_uart.py @@ -221,7 +221,6 @@ def test_uart_frame_received_error(connected_uart, mocker): znp.frame_received.call_count == 3 -@pytest.mark.asyncio async def test_connection_lost(dummy_serial_conn, mocker, event_loop): device, _ = dummy_serial_conn @@ -240,7 +239,6 @@ async def test_connection_lost(dummy_serial_conn, mocker, event_loop): assert (await conn_lost_fut) == exception -@pytest.mark.asyncio async def test_connection_made(dummy_serial_conn, mocker): device, _ = dummy_serial_conn znp = mocker.Mock() diff --git a/tests/test_utils.py b/tests/test_utils.py index 8dba6bcd..aa23370e 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -63,7 +63,6 @@ def test_command_deduplication_complex(): } -@pytest.mark.asyncio async def test_combine_concurrent_calls(): class TestFuncs: def __init__(self): diff --git a/tests/tools/test_energy_scan.py b/tests/tools/test_energy_scan.py index 0e146495..69be7549 100644 --- a/tests/tools/test_energy_scan.py +++ b/tests/tools/test_energy_scan.py @@ -9,7 +9,6 @@ from ..conftest import EMPTY_DEVICES, FORMED_DEVICES -@pytest.mark.asyncio @pytest.mark.parametrize("device", EMPTY_DEVICES) async def test_energy_scan_unformed(device, make_znp_server, caplog): znp_server = make_znp_server(server_cls=device) @@ -18,7 +17,6 @@ async def test_energy_scan_unformed(device, make_znp_server, caplog): assert "Form a network" in caplog.text -@pytest.mark.asyncio @pytest.mark.parametrize("device", FORMED_DEVICES) async def test_energy_scan_formed(device, make_znp_server, capsys): znp_server = make_znp_server(server_cls=device) diff --git a/tests/tools/test_flash.py b/tests/tools/test_flash.py index ba3a2635..98607d5a 100644 --- a/tests/tools/test_flash.py +++ b/tests/tools/test_flash.py @@ -9,9 +9,6 @@ from ..conftest import BaseServerZNP, CoroutineMock -pytestmark = [pytest.mark.asyncio] - - random.seed(12345) FAKE_IMAGE_SIZE = 2 ** 10 FAKE_FLASH = bytearray( diff --git a/tests/tools/test_form_network.py b/tests/tools/test_form_network.py index b9d5c797..92b587de 100644 --- a/tests/tools/test_form_network.py +++ b/tests/tools/test_form_network.py @@ -6,7 +6,6 @@ from ..conftest import ALL_DEVICES, EMPTY_DEVICES -@pytest.mark.asyncio @pytest.mark.parametrize("device", ALL_DEVICES) async def test_form_network(device, make_znp_server): znp_server = make_znp_server(server_cls=device) diff --git a/tests/tools/test_network_backup_restore.py b/tests/tools/test_network_backup_restore.py index 129ceeba..7b681daa 100644 --- a/tests/tools/test_network_backup_restore.py +++ b/tests/tools/test_network_backup_restore.py @@ -135,7 +135,6 @@ def test_schema_validation_device_key_info(backup_json): @pytest.mark.parametrize("device", EMPTY_DEVICES) -@pytest.mark.asyncio async def test_network_backup_empty(device, make_znp_server): znp_server = make_znp_server(server_cls=device) @@ -144,7 +143,6 @@ async def test_network_backup_empty(device, make_znp_server): @pytest.mark.parametrize("device", [FormedZStack1CC2531]) -@pytest.mark.asyncio async def test_network_backup_pipe(device, make_znp_server, capsys): znp_server = make_znp_server(server_cls=device) @@ -155,7 +153,6 @@ async def test_network_backup_pipe(device, make_znp_server, capsys): @pytest.mark.parametrize("device", FORMED_DEVICES) -@pytest.mark.asyncio async def test_network_backup_formed(device, make_znp_server, tmp_path): znp_server = make_znp_server(server_cls=device) @@ -192,7 +189,6 @@ async def test_network_backup_formed(device, make_znp_server, tmp_path): @pytest.mark.parametrize("device", ALL_DEVICES) -@pytest.mark.asyncio async def test_network_restore_and_backup( device, make_znp_server, backup_json, tmp_path ): @@ -230,7 +226,6 @@ async def test_network_restore_and_backup( @pytest.mark.parametrize("device", [ResetLaunchpadCC26X2R1]) -@pytest.mark.asyncio async def test_network_restore_pick_optimal_tclk( device, make_znp_server, backup_json, tmp_path ): @@ -254,7 +249,6 @@ async def test_network_restore_pick_optimal_tclk( assert backup_json2["stack_specific"]["zstack"]["tclk_seed"] == old_tclk_seed -@pytest.mark.asyncio async def test_tc_frame_counter_zstack1(make_connected_znp): znp, znp_server = await make_connected_znp(BaseZStack1CC2531) znp_server._nvram[ExNvIds.LEGACY] = { @@ -267,7 +261,6 @@ async def test_tc_frame_counter_zstack1(make_connected_znp): assert (await security.read_tc_frame_counter(znp)) == 0xAABBCCDD -@pytest.mark.asyncio async def test_tc_frame_counter_zstack30(make_connected_znp): znp, znp_server = await make_connected_znp(BaseZStack3CC2531) znp.network_info = BARE_NETWORK_INFO @@ -299,7 +292,6 @@ async def test_tc_frame_counter_zstack30(make_connected_znp): assert (await security.read_tc_frame_counter(znp)) == 0xAABBCCDD -@pytest.mark.asyncio async def test_tc_frame_counter_zstack33(make_connected_znp): znp, znp_server = await make_connected_znp(BaseLaunchpadCC26X2R1) znp.network_info = BARE_NETWORK_INFO diff --git a/tests/tools/test_network_scan.py b/tests/tools/test_network_scan.py index 49e290a6..cda1cb66 100644 --- a/tests/tools/test_network_scan.py +++ b/tests/tools/test_network_scan.py @@ -8,8 +8,6 @@ from ..conftest import FormedZStack1CC2531, FormedLaunchpadCC26X2R1 -pytestmark = [pytest.mark.asyncio] - @pytest.mark.parametrize("device", [FormedLaunchpadCC26X2R1]) async def test_network_scan(device, make_znp_server, capsys): diff --git a/tests/tools/test_nvram.py b/tests/tools/test_nvram.py index a015d068..b238e1ec 100644 --- a/tests/tools/test_nvram.py +++ b/tests/tools/test_nvram.py @@ -12,8 +12,6 @@ from ..conftest import ALL_DEVICES, BaseZStack1CC2531, FormedLaunchpadCC26X2R1 -pytestmark = [pytest.mark.asyncio] - def dump_nvram(znp): obj = {} From dedb473664a55c12aeff70277164e321f3d05bc6 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sun, 28 Nov 2021 21:03:23 -0500 Subject: [PATCH 4/4] Fully test the config schema changes --- tests/test_config.py | 71 ++++++++++++++++++++++++++++++++++++++------ zigpy_znp/config.py | 50 ++++++++++++++++--------------- 2 files changed, 88 insertions(+), 33 deletions(-) diff --git a/tests/test_config.py b/tests/test_config.py index ef046842..13cec41b 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,6 +1,9 @@ import enum -from zigpy_znp.config import EnumValue +import pytest +from voluptuous import Invalid + +import zigpy_znp.config as conf def test_EnumValue(): @@ -8,15 +11,65 @@ class TestEnum(enum.Enum): foo = 123 BAR = 456 - assert EnumValue(TestEnum)("foo") == TestEnum.foo - assert EnumValue(TestEnum)("BAR") == TestEnum.BAR + assert conf.EnumValue(TestEnum)("foo") == TestEnum.foo + assert conf.EnumValue(TestEnum)("BAR") == TestEnum.BAR + + assert conf.EnumValue(TestEnum, transformer=str.lower)("FOO") == TestEnum.foo + assert conf.EnumValue(TestEnum, transformer=str.upper)("bar") == TestEnum.BAR + + assert conf.EnumValue(TestEnum)(TestEnum.foo) == TestEnum.foo + assert conf.EnumValue(TestEnum)(TestEnum.BAR) == TestEnum.BAR + - assert ( - EnumValue(TestEnum, transformer=lambda s: str(s).lower())("FOO") == TestEnum.foo +def test_pin_states_same_lengths(): + # Bare schema works + conf.CONFIG_SCHEMA( + { + conf.CONF_DEVICE: {conf.CONF_DEVICE_PATH: "/dev/null"}, + conf.CONF_ZNP_CONFIG: {}, + } ) - assert ( - EnumValue(TestEnum, transformer=lambda s: str(s).upper())("bar") == TestEnum.BAR + + # So does one with explicitly specified pin states + config = conf.CONFIG_SCHEMA( + { + conf.CONF_DEVICE: {conf.CONF_DEVICE_PATH: "/dev/null"}, + conf.CONF_ZNP_CONFIG: { + conf.CONF_CONNECT_RTS_STATES: ["on", True, 0, 0, 0, 1, 1], + conf.CONF_CONNECT_DTR_STATES: ["off", False, 1, 0, 0, 1, 1], + }, + } ) - assert EnumValue(TestEnum)(TestEnum.foo) == TestEnum.foo - assert EnumValue(TestEnum)(TestEnum.BAR) == TestEnum.BAR + assert config[conf.CONF_ZNP_CONFIG][conf.CONF_CONNECT_RTS_STATES] == [ + True, + True, + False, + False, + False, + True, + True, + ] + assert config[conf.CONF_ZNP_CONFIG][conf.CONF_CONNECT_DTR_STATES] == [ + False, + False, + True, + False, + False, + True, + True, + ] + + +def test_pin_states_different_lengths(): + # They must be the same length + with pytest.raises(Invalid): + conf.CONFIG_SCHEMA( + { + conf.CONF_DEVICE: {conf.CONF_DEVICE_PATH: "/dev/null"}, + conf.CONF_ZNP_CONFIG: { + conf.CONF_CONNECT_RTS_STATES: [1, 1, 0], + conf.CONF_CONNECT_DTR_STATES: [1, 1], + }, + } + ) diff --git a/zigpy_znp/config.py b/zigpy_znp/config.py index cb197902..3d432195 100644 --- a/zigpy_znp/config.py +++ b/zigpy_znp/config.py @@ -77,30 +77,32 @@ def validator(config): { vol.Required(CONF_DEVICE): SCHEMA_DEVICE, vol.Optional(CONF_ZNP_CONFIG, default={}): vol.Schema( - { - vol.Optional(CONF_TX_POWER, default=None): vol.Any( - None, vol.All(int, vol.Range(min=-22, max=22)) - ), - vol.Optional(CONF_SREQ_TIMEOUT, default=15): VolPositiveNumber, - vol.Optional(CONF_ARSP_TIMEOUT, default=30): VolPositiveNumber, - vol.Optional( - CONF_AUTO_RECONNECT_RETRY_DELAY, default=5 - ): VolPositiveNumber, - vol.Optional(CONF_SKIP_BOOTLOADER, default=True): cv_boolean, - vol.Optional(CONF_LED_MODE, default=LEDMode.OFF): vol.Any( - None, EnumValue(LEDMode, lambda v: str(v).upper()) - ), - vol.Optional(CONF_MAX_CONCURRENT_REQUESTS, default="auto"): vol.Any( - "auto", VolPositiveNumber - ), - vol.Optional( - CONF_CONNECT_RTS_STATES, default=[False, True, False] - ): vol.Schema([cv_boolean]), - vol.Optional( - CONF_CONNECT_DTR_STATES, default=[False, False, False] - ): vol.Schema([cv_boolean]), - }, - keys_have_same_length(CONF_CONNECT_RTS_STATES, CONF_CONNECT_DTR_STATES), + vol.All( + { + vol.Optional(CONF_TX_POWER, default=None): vol.Any( + None, vol.All(int, vol.Range(min=-22, max=22)) + ), + vol.Optional(CONF_SREQ_TIMEOUT, default=15): VolPositiveNumber, + vol.Optional(CONF_ARSP_TIMEOUT, default=30): VolPositiveNumber, + vol.Optional( + CONF_AUTO_RECONNECT_RETRY_DELAY, default=5 + ): VolPositiveNumber, + vol.Optional(CONF_SKIP_BOOTLOADER, default=True): cv_boolean, + vol.Optional(CONF_LED_MODE, default=LEDMode.OFF): vol.Any( + None, EnumValue(LEDMode, lambda v: str(v).upper()) + ), + vol.Optional(CONF_MAX_CONCURRENT_REQUESTS, default="auto"): vol.Any( + "auto", VolPositiveNumber + ), + vol.Optional( + CONF_CONNECT_RTS_STATES, default=[False, True, False] + ): vol.Schema([cv_boolean]), + vol.Optional( + CONF_CONNECT_DTR_STATES, default=[False, False, False] + ): vol.Schema([cv_boolean]), + }, + keys_have_same_length(CONF_CONNECT_RTS_STATES, CONF_CONNECT_DTR_STATES), + ) ), } )