Skip to content

[WIP] Use a dedicated thread for Zigpy ZNP's serial communication #167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions zigpy_znp/async_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import asyncio
import logging
import functools
import threading

LOGGER = logging.getLogger(__name__)


_znp_loop = None # the loop in which the serial communication will be handled
_worker_loop = (
None # the loop in which the frames are handled (MainThread in home assistant)
)

# if there is a need to create a worker loop this will be the thread it is running in
_worker_loop_thread = None


def try_get_running_loop_as_worker_loop():
"""
this function will set the worker loop to the currently running loop
(if there is one).
"""
global _worker_loop
if _worker_loop is None:
try:
_worker_loop = asyncio.get_running_loop()
except RuntimeError:
pass


# this will get the running loop in case of integration in home assistant
# if there is no running loop, a loop will be created later
try_get_running_loop_as_worker_loop()


def get_worker_loop():
"""
Getter for the worker loop.
"""
global _worker_loop
if _worker_loop is None:
try:
_worker_loop = asyncio.get_running_loop()
LOGGER.info("used asyncio's running loop")
except RuntimeError:
create_new_worker_loop(True)
return _worker_loop


def get_znp_loop():
"""
Getter for the ZNP serial loop.
"""
return _znp_loop


def start_worker_loop_in_thread():
"""
Create a thread and run the worker loop.
"""
global _worker_loop_thread, _worker_loop
if _worker_loop_thread is None and _worker_loop is not None:

def run_worker_loop():
asyncio.set_event_loop(_worker_loop)
_worker_loop.run_forever()

_worker_loop_thread = threading.Thread(
target=run_worker_loop, daemon=True, name="ZigpyWorkerThread"
)
_worker_loop_thread.start()


def create_new_worker_loop(start_thread: bool = True):
"""
Creates a new worker loop, starts a new thread too, if start_thread is True.
"""
global _worker_loop
LOGGER.info("creating new event loop as worker loop")
_worker_loop = asyncio.new_event_loop()
if start_thread:
start_worker_loop_in_thread()


def init_znp_loop():
"""
Create and run ZNP loop.
"""
global _znp_loop
if _znp_loop is None:
_znp_loop = asyncio.new_event_loop()

def run_znp_loop():
# asyncio.set_event_loop(_znp_loop)
_znp_loop.run_forever()

znp_thread = threading.Thread(
target=run_znp_loop, daemon=True, name="ZigpySerialThread"
)
znp_thread.start()


# will create and start a new ZNP loop on module initialization
if _znp_loop is None:
init_znp_loop()


def run_in_loop(
function, loop=None, loop_getter=None, wait_for_result: bool = True, *args, **kwargs
):
"""
Can be used as decorator or as normal function.
Will run the function in the specified loop.
@param function:
The co-routine that shall be run (function call only)
@param loop:
Loop in which the co-routine shall run (either loop or loop_getter must be set)
@param loop_getter:
Getter for the loop in which the co-routine shall run
(either loop or loop_getter must be set)
@param wait_for_result:
Will "fire and forget" if false. Otherwise,
the return value of the coro is returned.
@param args: args
@param kwargs: kwargs
@return:
None if wait_for_result is false. Otherwise, the return value of the co-routine.
"""
if loop is None and loop_getter is None:
raise RuntimeError("either loop or loop_getter must be passed to run_in_loop")

if asyncio.iscoroutine(function):
# called as a function call
_loop = loop if loop is not None else loop_getter()
future = asyncio.run_coroutine_threadsafe(function, _loop)
return future.result() if wait_for_result else None
else:
# probably a decorator

# wrap the function in a new function,
# that will run the co-routine in the loop provided
@functools.wraps(function)
def new_sync(*args, **kwargs):
loop if loop is not None else loop_getter()
return run_in_loop(
function(*args, **kwargs),
loop=loop,
loop_getter=loop_getter,
wait_for_result=wait_for_result,
)

if not asyncio.iscoroutinefunction(function):
return new_sync
else:
# wrap the function again in an async function, so that it can be awaited
async def new_async(*args, **kwargs):
return new_sync(*args, **kwargs)

return new_async


def run_in_znp_loop(*args, **kwargs):
"""
Can be used as decorator or as normal function.
Will run the function in the znp loop.
@param function:
The co-routine that shall be run (function call only)
@param wait_for_result:
Will "fire and forget" if false.
Otherwise, the return value of the coro is returned.
@param args: args
@param kwargs: kwargs
@return:
None if wait_for_result is false.
Otherwise, the return value of the co-routine.
"""
kwargs["loop_getter"] = get_znp_loop
return run_in_loop(*args, **kwargs)


def run_in_worker_loop(*args, **kwargs):
"""
Can be used as decorator or as normal function.
Will run the function in the worker loop.
@param function:
The co-routine that shall be run (function call only)
@param wait_for_result:
Will "fire and forget" if false.
Otherwise, the return value of the coro is returned.
@param args: args
@param kwargs: kwargs
@return:
None if wait_for_result is false.
Otherwise, the return value of the co-routine.
"""
kwargs["loop_getter"] = get_worker_loop
return run_in_loop(*args, **kwargs)
4 changes: 2 additions & 2 deletions zigpy_znp/tools/energy_scan.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import sys
import asyncio
import logging
import itertools
from collections import deque, defaultdict
Expand All @@ -8,6 +7,7 @@
from zigpy.exceptions import NetworkNotFormed

import zigpy_znp.types as t
from zigpy_znp import async_utils
from zigpy_znp.tools.common import setup_parser
from zigpy_znp.zigbee.application import ControllerApplication

Expand Down Expand Up @@ -93,4 +93,4 @@ async def main(argv):


if __name__ == "__main__":
asyncio.run(main(sys.argv[1:])) # pragma: no cover
async_utils.run_in_worker_loop(main(sys.argv[1:])) # pragma: no cover
3 changes: 2 additions & 1 deletion zigpy_znp/tools/flash_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import async_timeout

import zigpy_znp.commands as c
from zigpy_znp import async_utils
from zigpy_znp.api import ZNP
from zigpy_znp.config import CONFIG_SCHEMA
from zigpy_znp.tools.common import ClosableFileType, setup_parser
Expand Down Expand Up @@ -87,4 +88,4 @@ async def main(argv):


if __name__ == "__main__":
asyncio.run(main(sys.argv[1:])) # pragma: no cover
async_utils.run_in_worker_loop(main(sys.argv[1:])) # pragma: no cover
3 changes: 2 additions & 1 deletion zigpy_znp/tools/flash_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import zigpy_znp.types as t
import zigpy_znp.commands as c
from zigpy_znp import async_utils
from zigpy_znp.api import ZNP
from zigpy_znp.config import CONFIG_SCHEMA
from zigpy_znp.tools.common import ClosableFileType, setup_parser
Expand Down Expand Up @@ -174,4 +175,4 @@ async def main(argv):


if __name__ == "__main__":
asyncio.run(main(sys.argv[1:])) # pragma: no cover
async_utils.run_in_worker_loop(main(sys.argv[1:])) # pragma: no cover
6 changes: 4 additions & 2 deletions zigpy_znp/tools/network_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import sys
import json
import asyncio
import logging
import datetime

import zigpy.state

import zigpy_znp
import zigpy_znp.types as t
from zigpy_znp import async_utils
from zigpy_znp.api import ZNP
from zigpy_znp.tools.common import ClosableFileType, setup_parser, validate_backup_json
from zigpy_znp.zigbee.application import ControllerApplication
Expand Down Expand Up @@ -117,6 +117,8 @@ async def main(argv: list[str]) -> None:

f.write(json.dumps(backup_obj, indent=4))

LOGGER.info("done")


if __name__ == "__main__":
asyncio.run(main(sys.argv[1:])) # pragma: no cover
async_utils.run_in_worker_loop(main(sys.argv[1:])) # pragma: no cover
4 changes: 2 additions & 2 deletions zigpy_znp/tools/network_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import sys
import json
import asyncio

import zigpy.state
import zigpy.zdo.types as zdo_t

import zigpy_znp.const as const
import zigpy_znp.types as t
from zigpy_znp import async_utils
from zigpy_znp.api import ZNP
from zigpy_znp.tools.common import ClosableFileType, setup_parser, validate_backup_json
from zigpy_znp.zigbee.application import ControllerApplication
Expand Down Expand Up @@ -130,4 +130,4 @@ async def main(argv: list[str]) -> None:


if __name__ == "__main__":
asyncio.run(main(sys.argv[1:])) # pragma: no cover
async_utils.run_in_worker_loop(main(sys.argv[1:])) # pragma: no cover
4 changes: 2 additions & 2 deletions zigpy_znp/tools/network_scan.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import sys
import time
import asyncio
import logging
import itertools

import zigpy_znp.types as t
import zigpy_znp.commands as c
from zigpy_znp import async_utils
from zigpy_znp.api import ZNP
from zigpy_znp.config import CONFIG_SCHEMA
from zigpy_znp.types.nvids import OsalNvIds
Expand Down Expand Up @@ -155,4 +155,4 @@ async def main(argv):


if __name__ == "__main__":
asyncio.run(main(sys.argv[1:])) # pragma: no cover
async_utils.run_in_worker_loop(main(sys.argv[1:])) # pragma: no cover
4 changes: 2 additions & 2 deletions zigpy_znp/tools/nvram_read.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import sys
import json
import asyncio
import logging

import zigpy_znp.types as t
from zigpy_znp import async_utils
from zigpy_znp.api import ZNP
from zigpy_znp.config import CONFIG_SCHEMA
from zigpy_znp.exceptions import SecurityError, CommandNotRecognized
Expand Down Expand Up @@ -96,4 +96,4 @@ async def main(argv):


if __name__ == "__main__":
asyncio.run(main(sys.argv[1:])) # pragma: no cover
async_utils.run_in_worker_loop(main(sys.argv[1:])) # pragma: no cover
4 changes: 2 additions & 2 deletions zigpy_znp/tools/nvram_reset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sys
import asyncio
import logging

from zigpy_znp import async_utils
from zigpy_znp.api import ZNP
from zigpy_znp.config import CONFIG_SCHEMA
from zigpy_znp.types.nvids import (
Expand Down Expand Up @@ -79,4 +79,4 @@ async def main(argv):


if __name__ == "__main__":
asyncio.run(main(sys.argv[1:])) # pragma: no cover
async_utils.run_in_worker_loop(main(sys.argv[1:])) # pragma: no cover
4 changes: 2 additions & 2 deletions zigpy_znp/tools/nvram_write.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import sys
import json
import asyncio
import logging

from zigpy_znp import async_utils
from zigpy_znp.api import ZNP
from zigpy_znp.config import CONFIG_SCHEMA
from zigpy_znp.types.nvids import ExNvIds, OsalNvIds
Expand Down Expand Up @@ -68,4 +68,4 @@ async def main(argv):


if __name__ == "__main__":
asyncio.run(main(sys.argv[1:])) # pragma: no cover
async_utils.run_in_worker_loop(main(sys.argv[1:])) # pragma: no cover
Loading