diff --git a/tests/test_application.py b/tests/test_application.py index 5d293e5..48030a1 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -4,8 +4,8 @@ import pytest from zigpy.exceptions import DeliveryError -from zigpy.types import EUI64, uint16_t -from zigpy.zdo.types import LogicalType +from zigpy import types as t +from zigpy.zdo.types import LogicalType, ZDOCmd from zigpy_xbee.api import ModemStatus, XBee from zigpy_xbee.zigbee import application @@ -143,7 +143,7 @@ def test_rx_failed_deserialize(app, caplog): app._handle_reply = mock.MagicMock() app.handle_message = mock.MagicMock() nwk = 0x1234 - ieee, _ = EUI64.deserialize(b'\x01\x02\x03\x04\x05\x06\x07\x08') + ieee, _ = t.EUI64.deserialize(b'\x01\x02\x03\x04\x05\x06\x07\x08') device = app.add_device(ieee, nwk) device.status = DeviceStatus.ENDPOINTS_INIT app.get_device = mock.MagicMock(return_value=device) @@ -168,10 +168,10 @@ def test_rx_failed_deserialize(app, caplog): @pytest.fixture def device(app): - def _device(new=False, zdo_init=False, nwk=uint16_t(0x1234)): + def _device(new=False, zdo_init=False, nwk=t.uint16_t(0x1234)): from zigpy.device import Device, Status as DeviceStatus - ieee, _ = EUI64.deserialize(b'\x08\x07\x06\x05\x04\x03\x02\x01') + ieee, _ = t.EUI64.deserialize(b'\x08\x07\x06\x05\x04\x03\x02\x01') dev = Device(app, ieee, nwk) if new: dev.status = DeviceStatus.NEW @@ -368,43 +368,43 @@ async def test_startup_ai(app): auto_form = True await _test_startup(app, 0x00, auto_form) assert app._nwk == 0x0000 - assert app._ieee == EUI64(range(1, 9)) + assert app._ieee == t.EUI64(range(1, 9)) assert app.form_network.call_count == 0 auto_form = False await _test_startup(app, 0x00, auto_form) assert app._nwk == 0x0000 - assert app._ieee == EUI64(range(1, 9)) + assert app._ieee == t.EUI64(range(1, 9)) assert app.form_network.call_count == 0 auto_form = True await _test_startup(app, 0x06, auto_form) assert app._nwk == 0xfffe - assert app._ieee == EUI64(range(1, 9)) + assert app._ieee == t.EUI64(range(1, 9)) assert app.form_network.call_count == 1 auto_form = False await _test_startup(app, 0x06, auto_form) assert app._nwk == 0xfffe - assert app._ieee == EUI64(range(1, 9)) + assert app._ieee == t.EUI64(range(1, 9)) assert app.form_network.call_count == 0 auto_form = True await _test_startup(app, 0x00, auto_form, zs=1) assert app._nwk == 0x0000 - assert app._ieee == EUI64(range(1, 9)) + assert app._ieee == t.EUI64(range(1, 9)) assert app.form_network.call_count == 1 auto_form = False await _test_startup(app, 0x06, auto_form, legacy_module=True) assert app._nwk == 0xfffe - assert app._ieee == EUI64(range(1, 9)) + assert app._ieee == t.EUI64(range(1, 9)) assert app.form_network.call_count == 0 auto_form = True await _test_startup(app, 0x00, auto_form, zs=1, legacy_module=True) assert app._nwk == 0x0000 - assert app._ieee == EUI64(range(1, 9)) + assert app._ieee == t.EUI64(range(1, 9)) assert app.form_network.call_count == 1 @@ -413,7 +413,7 @@ async def test_startup_no_api_mode(app): auto_form = True await _test_startup(app, 0x00, auto_form, api_mode=False) assert app._nwk == 0x0000 - assert app._ieee == EUI64(range(1, 9)) + assert app._ieee == t.EUI64(range(1, 9)) assert app.form_network.call_count == 0 assert app._api.init_api_mode.call_count == 1 assert app._api._at_command.call_count >= 16 @@ -446,7 +446,7 @@ async def _test_request(app, do_reply=True, expect_reply=True, logical_type=None, **kwargs): seq = 123 nwk = 0x2345 - ieee = EUI64(b'\x01\x02\x03\x04\x05\x06\x07\x08') + ieee = t.EUI64(b'\x01\x02\x03\x04\x05\x06\x07\x08') dev = app.add_device(ieee, nwk) dev.node_desc = mock.MagicMock() dev.node_desc.logical_type = logical_type @@ -591,3 +591,54 @@ def test_remote_at_cmd(app, device): assert app._api._remote_at_command.call_args[0][2] == 0x22 assert app._api._remote_at_command.call_args[0][3] == s.cmd assert app._api._remote_at_command.call_args[0][4] == s.data + + +@pytest.fixture +def ieee(): + return t.EUI64.deserialize(b'\x00\x01\x02\x03\x04\x05\x06\x07')[0] + + +@pytest.fixture +def nwk(): + return t.uint16_t(0x0100) + + +def test_rx_device_annce(app, ieee, nwk): + dst_ep = 0 + cluster_id = ZDOCmd.Device_annce + device = mock.MagicMock() + device.status = device.Status.NEW + app.get_device = mock.MagicMock(return_value=device) + app.deserialize = mock.MagicMock(return_value=(mock.sentinel.tsn, + mock.sentinel.cmd_id, + False, + mock.sentinel.args, )) + app.handle_join = mock.MagicMock() + app._handle_reply = mock.MagicMock() + app.handle_message = mock.MagicMock() + + data = t.uint8_t(0xaa).serialize() + data += nwk.serialize() + data += ieee.serialize() + data += t.uint8_t(0x8e).serialize() + + app.handle_rx( + ieee, + nwk, + mock.sentinel.src_ep, + dst_ep, + cluster_id, + mock.sentinel.profile_id, + mock.sentinel.rx_opt, + data, + ) + + assert app.deserialize.call_count == 1 + assert app.deserialize.call_args[0][2] == cluster_id + assert app.deserialize.call_args[0][3] == data + assert app._handle_reply.call_count == 0 + assert app.handle_message.call_count == 1 + assert app.handle_join.call_count == 1 + assert app.handle_join.call_args[0][0] == nwk + assert app.handle_join.call_args[0][1] == ieee + assert app.handle_join.call_args[0][2] == 0 diff --git a/zigpy_xbee/zigbee/application.py b/zigpy_xbee/zigbee/application.py index 93179ed..d0fac26 100644 --- a/zigpy_xbee/zigbee/application.py +++ b/zigpy_xbee/zigbee/application.py @@ -210,8 +210,8 @@ def handle_rx(self, src_ieee, src_nwk, src_ep, dst_ep, cluster_id, profile_id, r ember_ieee = zigpy.types.EUI64(src_ieee) if dst_ep == 0 and cluster_id == 0x13: # ZDO Device announce request - nwk, data = zigpy.types.uint16_t.deserialize(data[1:]) - ieee, data = zigpy.types.EUI64.deserialize(data) + nwk, rest = zigpy.types.NWK.deserialize(data[1:]) + ieee, rest = zigpy.types.EUI64.deserialize(rest) LOGGER.info("New device joined: NWK 0x%04x, IEEE %s", nwk, ieee) if ember_ieee != ieee: LOGGER.warning(