diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index 3676325..6435da9 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -3,76 +3,14 @@ """A highlevel interface for the dispatch API.""" -import grpc.aio -from frequenz.channels import Broadcast, Receiver -from frequenz.client.dispatch.types import Dispatch - -from frequenz.dispatch.actor import DispatchActor, DispatchEvent - -__all__ = ["Dispatcher"] - - -class Dispatcher: - """A highlevel interface for the dispatch API. - - This class provides a highlevel interface to the dispatch API. - It provides two channels: - - One that sends a dispatch event message whenever a dispatch is created, updated or deleted. - - The other sends a dispatch message whenever a dispatch is ready to be - executed according to the schedule. - - allows to receive new dispatches and ready dispatches. - - Example: - ```python - from frequenz.dispatch import Dispatcher - - async def run(): - dispatcher = Dispatcher(API_CONNECTION_INFO) - dispatcher.start() # this will start the actor - dispatch_arrived = dispatcher.new_dispatches().new_receiver() - dispatch_ready = dispatcher.ready_dispatches().new_receiver() - ``` - """ - - def __init__( - self, microgrid_id: int, grpc_channel: grpc.aio.Channel, svc_addr: str - ): - """Initialize the dispatcher. - - Args: - microgrid_id: The microgrid id. - grpc_channel: The gRPC channel. - svc_addr: The service address. - """ - self._ready_channel = Broadcast[Dispatch]("ready_dispatches") - self._updated_channel = Broadcast[DispatchEvent]("new_dispatches") - self._actor = DispatchActor( - microgrid_id, - grpc_channel, - svc_addr, - self._updated_channel.new_sender(), - self._ready_channel.new_sender(), - ) - - async def start(self) -> None: - """Start the actor.""" - self._actor.start() - - def updated_dispatches(self) -> Receiver[DispatchEvent]: - """Return new, updated or deleted dispatches receiver. - - Returns: - A new receiver for new dispatches. - """ - return self._updated_channel.new_receiver() - - def ready_dispatches(self) -> Receiver[Dispatch]: - """Return ready dispatches receiver. - - Returns: - A new receiver for ready dispatches. - """ - return self._ready_channel.new_receiver() +from frequenz.dispatch._dispatcher import Dispatcher, ReceiverFetcher +from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated + +__all__ = [ + "Created", + "Deleted", + "DispatchEvent", + "Dispatcher", + "ReceiverFetcher", + "Updated", +] diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py new file mode 100644 index 0000000..31ec180 --- /dev/null +++ b/src/frequenz/dispatch/_dispatcher.py @@ -0,0 +1,140 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""A highlevel interface for the dispatch API.""" + +import abc +from typing import Protocol, TypeVar + +import grpc.aio +from frequenz.channels import Broadcast, Receiver +from frequenz.client.dispatch.types import Dispatch + +from frequenz.dispatch._event import DispatchEvent +from frequenz.dispatch.actor import DispatchingActor + +ReceivedT = TypeVar("ReceivedT") +"""The type being received.""" + + +class ReceiverFetcher(Protocol[ReceivedT]): + """An interface that just exposes a `new_receiver` method.""" + + @abc.abstractmethod + def new_receiver( + self, name: str | None = None, maxsize: int = 50 + ) -> Receiver[ReceivedT]: + """Get a receiver from the channel. + + Args: + name: A name to identify the receiver in the logs. + maxsize: The maximum size of the receiver. + + Returns: + A receiver instance. + """ + + +class Dispatcher: + """A highlevel interface for the dispatch API. + + This class provides a highlevel interface to the dispatch API. + It provides two channels: + + One that sends a dispatch event message whenever a dispatch is created, updated or deleted. + + The other sends a dispatch message whenever a dispatch is ready to be + executed according to the schedule. + + allows to receive new dispatches and ready dispatches. + + Example: Processing ready-to-execute dispatches + ```python + import grpc.aio + + async def run(): + grpc_channel = grpc.aio.insecure_channel("localhost:50051") + microgrid_id = 1 + service_address = "localhost:50051" + + dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) + dispatcher.start() # this will start the actor + + ready_receiver = dispatcher.ready_to_execute.new_receiver() + + async for dispatch in ready_receiver: + print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") + # execute the dispatch + ``` + + Example: Getting notification about dispatch lifecycle events + ```python + from typing import assert_never + + import grpc.aio + from frequenz.dispatch import Created, Deleted, Dispatcher, Updated + + + async def run(): + grpc_channel = grpc.aio.insecure_channel("localhost:50051") + microgrid_id = 1 + service_address = "localhost:50051" + dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) + dispatcher.start() # this will start the actor + + events_receiver = dispatcher.lifecycle_events.new_receiver() + + async for event in events_receiver: + match event: + case Created(dispatch): + print(f"A dispatch was created: {dispatch}") + case Deleted(dispatch): + print(f"A dispatch was deleted: {dispatch}") + case Updated(dispatch): + print(f"A dispatch was updated: {dispatch}") + case _ as unhandled: + assert_never(unhandled) + ``` + """ + + def __init__( + self, microgrid_id: int, grpc_channel: grpc.aio.Channel, svc_addr: str + ): + """Initialize the dispatcher. + + Args: + microgrid_id: The microgrid id. + grpc_channel: The gRPC channel. + svc_addr: The service address. + """ + self._ready_channel = Broadcast[Dispatch]("ready_dispatches") + self._updated_channel = Broadcast[DispatchEvent]("new_dispatches") + self._actor = DispatchingActor( + microgrid_id, + grpc_channel, + svc_addr, + self._updated_channel.new_sender(), + self._ready_channel.new_sender(), + ) + + async def start(self) -> None: + """Start the actor.""" + self._actor.start() + + @property + def lifecycle_events(self) -> ReceiverFetcher[DispatchEvent]: + """Return new, updated or deleted dispatches receiver. + + Returns: + A new receiver for new dispatches. + """ + return self._updated_channel + + @property + def ready_to_execute(self) -> ReceiverFetcher[Dispatch]: + """Return ready dispatches receiver. + + Returns: + A new receiver for ready dispatches. + """ + return self._ready_channel diff --git a/src/frequenz/dispatch/_event.py b/src/frequenz/dispatch/_event.py new file mode 100644 index 0000000..670c501 --- /dev/null +++ b/src/frequenz/dispatch/_event.py @@ -0,0 +1,40 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Dispatch lifecycle events.""" + +from dataclasses import dataclass + +from frequenz.client.dispatch.types import Dispatch + + +@dataclass(frozen=True) +class Created: + """A dispatch created event.""" + + dispatch: Dispatch + """The dispatch that was created.""" + + +@dataclass(frozen=True) +class Updated: + """A dispatch updated event.""" + + dispatch: Dispatch + """The dispatch that was updated.""" + + +@dataclass(frozen=True) +class Deleted: + """A dispatch deleted event.""" + + dispatch: Dispatch + """The dispatch that was deleted.""" + + +DispatchEvent = Created | Updated | Deleted +"""Type that is sent over the channel for dispatch updates. + +This type is used to send dispatches that were created, updated or deleted +over the channel. +""" diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/actor.py index 2e5ede4..0d186b3 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/actor.py @@ -5,17 +5,19 @@ import asyncio import logging -from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import cast import grpc.aio from dateutil import rrule from frequenz.channels import Sender +from frequenz.channels.util import Timer from frequenz.client.dispatch import Client from frequenz.client.dispatch.types import Dispatch, Frequency, Weekday from frequenz.sdk.actor import Actor +from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated + _MAX_AHEAD_SCHEDULE = timedelta(hours=5) """The maximum time ahead to schedule a dispatch. @@ -55,37 +57,7 @@ """To map from our Weekday enum to the dateutil library enum.""" -@dataclass(frozen=True) -class _DispatchEventBase: - dispatch: Dispatch - """The dispatch that this event is about. - - Objects of this base class are sent over the channel when a dispatch is - created, updated or deleted. - """ - - -class Created(_DispatchEventBase): - """Wraps a dispatch that was created.""" - - -class Updated(_DispatchEventBase): - """Wraps a dispatch that was updated.""" - - -class Deleted(_DispatchEventBase): - """Wraps a dispatch that was deleted.""" - - -DispatchEvent = Created | Updated | Deleted -"""Type that is sent over the channel for dispatch updates. - -This type is used to send dispatches that were created, updated or deleted -over the channel. -""" - - -class DispatchActor(Actor): +class DispatchingActor(Actor): """Dispatch actor. This actor is responsible for handling dispatches for a microgrid. @@ -122,14 +94,14 @@ def __init__( self._microgrid_id = microgrid_id self._updated_dispatch_sender = updated_dispatch_sender self._ready_dispatch_sender = ready_dispatch_sender - self._poll_interval = poll_interval + self._poll_timer = Timer.timeout(poll_interval) async def _run(self) -> None: """Run the actor.""" + self._poll_timer.reset() try: - while True: + async for _ in self._poll_timer: await self._fetch() - await asyncio.sleep(self._poll_interval.total_seconds()) except asyncio.CancelledError: for task in self._scheduled.values(): task.cancel() diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index 1d6b0df..fc12aa6 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -19,13 +19,8 @@ from frequenz.client.dispatch.types import Dispatch, Frequency from pytest import fixture -from frequenz.dispatch.actor import ( - Created, - Deleted, - DispatchActor, - DispatchEvent, - Updated, -) +from frequenz.dispatch import Created, Deleted, DispatchEvent, Updated +from frequenz.dispatch.actor import DispatchingActor # This method replaces the event loop for all tests in the file. @@ -55,7 +50,7 @@ def _now() -> datetime: class ActorTestEnv: """Test environment for the actor.""" - actor: DispatchActor + actor: DispatchingActor """The actor under test.""" updated_dispatches: Receiver[DispatchEvent] """The receiver for updated dispatches.""" @@ -90,7 +85,7 @@ async def send(self, msg: T) -> None: ready_dispatches = Broadcast[Dispatch]("ready_dispatches") microgrid_id = randint(1, 100) - actor = DispatchActor( + actor = DispatchingActor( microgrid_id=microgrid_id, grpc_channel=MagicMock(), svc_addr="localhost", @@ -228,7 +223,7 @@ async def test_dispatch_schedule( await actor_env.client.create(**to_create_params(sample)) dispatch = actor_env.client.dispatches[0] - next_run = DispatchActor.calculate_next_run(dispatch, _now()) + next_run = DispatchingActor.calculate_next_run(dispatch, _now()) assert next_run is not None fake_time.shift(next_run - _now() - timedelta(seconds=1))