Skip to content

Implement the zigpy channel changing API #212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readme = "README.md"
license = {text = "GPL-3.0"}
requires-python = ">=3.8"
dependencies = [
"zigpy>=0.52.0",
"zigpy>=0.55.0",
"async_timeout",
"voluptuous",
"coloredlogs",
Expand Down
36 changes: 9 additions & 27 deletions tests/application/test_zdo_requests.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import asyncio

import pytest
import zigpy.zdo
import zigpy.types as zigpy_t
import zigpy.zdo.types as zdo_t

import zigpy_znp.types as t
Expand All @@ -12,19 +10,19 @@


@pytest.mark.parametrize(
"broadcast,nwk_update_id,change_channel",
"nwk_update_id,change_channel",
[
(False, 1, False),
(False, 1, True),
(True, 1, False),
(False, 200, True),
(1, False),
(1, True),
(1, False),
(200, True),
],
)
@pytest.mark.parametrize("device", [FormedLaunchpadCC26X2R1])
async def test_mgmt_nwk_update_req(
device, broadcast, nwk_update_id, change_channel, make_application, mocker
device, nwk_update_id, change_channel, make_application, mocker
):
mocker.patch("zigpy_znp.zigbee.device.NWK_UPDATE_LOOP_DELAY", 0.1)
mocker.patch("zigpy.application.CHANNEL_CHANGE_SETTINGS_RELOAD_DELAY_S", 0.1)

app, znp_server = make_application(server_cls=device)

Expand Down Expand Up @@ -72,29 +70,13 @@ async def update_channel(req):

await app.startup(auto_form=False)

update = zdo_t.NwkUpdate(
ScanChannels=t.Channels.from_channel_list([new_channel]),
ScanDuration=zdo_t.NwkUpdate.CHANNEL_CHANGE_REQ,
nwkUpdateId=nwk_update_id,
)

if broadcast:
await zigpy.zdo.broadcast(
app,
zdo_t.ZDOCmd.Mgmt_NWK_Update_req,
0x0000, # group id (ignore)
0, # radius
update,
broadcast_address=zigpy_t.BroadcastAddress.ALL_ROUTERS_AND_COORDINATOR,
)
else:
await app._device.zdo.Mgmt_NWK_Update_req(update)
await app.move_network_to_channel(new_channel=new_channel)

if change_channel:
await nwk_update_req
else:
assert not nwk_update_req.done()

assert znp_server.nib.nwkLogicalChannel == list(update.ScanChannels)[0]
assert znp_server.nib.nwkLogicalChannel == new_channel

await app.shutdown()
17 changes: 17 additions & 0 deletions zigpy_znp/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,23 @@ async def permit_with_key(self, node: t.EUI64, code: bytes, time_s=60):
RspStatus=t.Status.SUCCESS,
)

async def _move_network_to_channel(
self, new_channel: int, new_nwk_update_id: int
) -> None:
"""Moves device to a new channel."""
await self._znp.request(
request=c.ZDO.MgmtNWKUpdateReq.Req(
Dst=0x0000,
DstAddrMode=t.AddrMode.NWK,
Channels=t.Channels.from_channel_list([new_channel]),
ScanDuration=zdo_t.NwkUpdate.CHANNEL_CHANGE_REQ,
ScanCount=0,
NwkManagerAddr=0x0000,
# `new_nwk_update_id` is ignored
),
RspStatus=t.Status.SUCCESS,
)

def connection_lost(self, exc):
"""
Propagated up from UART through ZNP when the connection is lost.
Expand Down
121 changes: 0 additions & 121 deletions zigpy_znp/zigbee/device.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
from __future__ import annotations

import asyncio
import logging

import zigpy.zdo
import zigpy.device
import zigpy.zdo.types as zdo_t
import zigpy.application

import zigpy_znp.types as t
import zigpy_znp.commands as c
import zigpy_znp.zigbee.application as znp_app

LOGGER = logging.getLogger(__name__)

NWK_UPDATE_LOOP_DELAY = 1
Expand All @@ -22,13 +16,6 @@ class ZNPCoordinator(zigpy.device.Device):
Coordinator zigpy device that keeps track of our endpoints and clusters.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

assert hasattr(self, "zdo")
self.zdo = ZNPZDOEndpoint(self)
self.endpoints[0] = self.zdo

@property
def manufacturer(self):
return "Texas Instruments"
Expand Down Expand Up @@ -72,111 +59,3 @@ def request(
timeout=timeout,
use_ieee=use_ieee,
)


class ZNPZDOEndpoint(zigpy.zdo.ZDO):
@property
def app(self) -> zigpy.application.ControllerApplication:
return self.device.application

def _send_loopback_reply(
self, command_id: zdo_t.ZDOCmd, *, tsn: t.uint8_t, **kwargs
):
"""
Constructs and sends back a loopback ZDO response.
"""

message = t.uint8_t(tsn).serialize() + self._serialize(
command_id, *kwargs.values()
)

LOGGER.debug("Sending loopback reply %s (%s), tsn=%s", command_id, kwargs, tsn)

self.app.handle_message(
sender=self.app._device,
profile=znp_app.ZDO_PROFILE,
cluster=command_id,
src_ep=znp_app.ZDO_ENDPOINT,
dst_ep=znp_app.ZDO_ENDPOINT,
message=message,
)

def handle_mgmt_nwk_update_req(
self, hdr: zdo_t.ZDOHeader, NwkUpdate: zdo_t.NwkUpdate, *, dst_addressing
):
"""
Handles ZDO `Mgmt_NWK_Update_req` sent to the coordinator.
"""

self.create_catching_task(
self.async_handle_mgmt_nwk_update_req(
hdr, NwkUpdate, dst_addressing=dst_addressing
)
)

async def async_handle_mgmt_nwk_update_req(
self, hdr: zdo_t.ZDOHeader, NwkUpdate: zdo_t.NwkUpdate, *, dst_addressing
):
# Energy scans are handled properly by Z-Stack, no need to do anything
if NwkUpdate.ScanDuration not in (
zdo_t.NwkUpdate.CHANNEL_CHANGE_REQ,
zdo_t.NwkUpdate.CHANNEL_MASK_MANAGER_ADDR_CHANGE_REQ,
):
return

old_network_info = self.app.state.network_info

if (
t.Channels.from_channel_list([old_network_info.channel])
== NwkUpdate.ScanChannels
):
LOGGER.warning("NWK update request is ignored when channel does not change")
self._send_loopback_reply(
zdo_t.ZDOCmd.Mgmt_NWK_Update_rsp,
Status=zdo_t.Status.SUCCESS,
ScannedChannels=t.Channels.NO_CHANNELS,
TotalTransmissions=0,
TransmissionFailures=0,
EnergyValues=[],
tsn=hdr.tsn,
)
return

await self.app._znp.request(
request=c.ZDO.MgmtNWKUpdateReq.Req(
Dst=0x0000,
DstAddrMode=t.AddrMode.NWK,
Channels=NwkUpdate.ScanChannels,
ScanDuration=NwkUpdate.ScanDuration,
# Missing fields in the request cannot be `None` in the Z-Stack command
ScanCount=NwkUpdate.ScanCount or 0,
NwkManagerAddr=NwkUpdate.nwkManagerAddr or 0x0000,
),
RspStatus=t.Status.SUCCESS,
)

# Wait until the network info changes, it can take ~5s
while (
self.app.state.network_info.nwk_update_id == old_network_info.nwk_update_id
):
await self.app.load_network_info(load_devices=False)
await asyncio.sleep(NWK_UPDATE_LOOP_DELAY)

# Z-Stack automatically increments the NWK update ID instead of setting it
# TODO: Directly set it once radio settings API is finalized.
if NwkUpdate.nwkUpdateId != self.app.state.network_info.nwk_update_id:
LOGGER.warning(
f"`nwkUpdateId` was incremented to"
f" {self.app.state.network_info.nwk_update_id} instead of being"
f" set to {NwkUpdate.nwkUpdateId}"
)

self._send_loopback_reply(
zdo_t.ZDOCmd.Mgmt_NWK_Update_rsp,
Status=zdo_t.Status.SUCCESS,
ScannedChannels=t.Channels.NO_CHANNELS,
TotalTransmissions=0,
TransmissionFailures=0,
EnergyValues=[],
tsn=hdr.tsn,
)