Source code for libkirk.evt

"""
.. module:: events
    :platform: Linux
    :synopsis: events handler implementation module

.. moduleauthor:: Andrea Cervesato <andrea.cervesato@suse.com>
"""

import asyncio
import logging
from typing import (
    Any,
    Callable,
    Dict,
    List,
    Optional,
)


[docs] class Event: """ An event to process. """ def __init__(self, ordered: bool = False) -> None: """ :param ordered: True if coroutines must be processed in order. :type ordered: bool """ self._coros = [] self._ordered = ordered
[docs] def remove(self, coro: Callable) -> None: """ Remove a specific Callable associated to the event. :param coro: Callable to remove. :type coro: Callable """ try: self._coros.remove(coro) except ValueError: pass
[docs] def has_coros(self) -> bool: """ Check if there are still available registrations. :return: True if there are registered coroutines. :rtype: bool """ return bool(self._coros)
[docs] def register(self, coro: Callable) -> None: """ Register a new Callable. :param coro: Coroutine to register. :type coro: Callable """ self._coros.append(coro)
[docs] def create_tasks(self, *args: Any, **kwargs: Any) -> List[Any]: """ Create tasks to run according to registered coroutines. :param args: Arguments to be passed to callback functions execution. :type args: list :param kwargs: Keyword arguments to be passed to callback functions execution. :type kwargs: dict :return: List of tasks to execute. :rtype: list(asyncio.Task) """ if self._ordered: return [coro(*args, **kwargs) for coro in self._coros] tasks = [coro(*args, **kwargs) for coro in self._coros] return [asyncio.gather(*tasks)]
[docs] class EventsHandler: """ This class implements event loop and events handling. """ def __init__(self) -> None: self._logger = logging.getLogger("kirk.events") self._tasks = asyncio.Queue() self._lock = asyncio.Lock() self._events: Dict[str, Event] = {} self._stop = False # register a default event used to notify internal # errors in the our application self._events["internal_error"] = Event() def _get_event(self, name: str) -> Optional[Event]: """ Return an event according to its name. """ return self._events.get(name, None)
[docs] def reset(self) -> None: """ Reset the entire events queue. """ self._logger.info("Reset events queue") self._events.clear()
[docs] def is_registered(self, event_name: str) -> bool: """ Returns True if event_name is registered. :param event_name: Name of the event. :type event_name: str :return: True if registered, False otherwise. :rtype: bool """ if not event_name: raise ValueError("event_name is empty") evt = self._get_event(event_name) return evt.has_coros() if evt else False
[docs] def register(self, event_name: str, coro: Callable, ordered: bool = False) -> None: """ Register an event with event_name. :param event_name: Name of the event. :type event_name: str :param coro: Callable associated with event_name. :type coro: Callable :param ordered: If True, the event will raise coroutines in the order they arrive. :type ordered: bool """ if not event_name: raise ValueError("event_name is empty") if not coro: raise ValueError("coro is empty") self._logger.info("Register event: %s", repr(event_name)) evt = self._events.setdefault(event_name, Event(ordered=ordered)) evt.register(coro)
[docs] def unregister(self, event_name: str, coro: Callable) -> None: """ Unregister a single event Callable with event_name. If coro is None, all coroutines registered will be removed. :param event_name: Name of the event. :type event_name: str :param coro: Callable to unregister. :type coro: Callable """ if not event_name: raise ValueError("event_name is empty") if not self.is_registered(event_name): raise ValueError(f"{event_name} is not registered") self._logger.info("Unregister event: %s -> %s", repr(event_name), repr(coro)) if coro: self._events[event_name].remove(coro) else: del self._events[event_name]
[docs] async def fire(self, event_name: str, *args: Any, **kwargs: Any) -> None: """ Fire a specific event. :param event_name: Name of the event. :type event_name: str :param args: Arguments to be passed to callback functions execution. :type args: Any :param kwargs: Keyword arguments to be passed to callback functions execution. :type kwargs: Any """ if not event_name: raise ValueError("event_name is empty") evt = self._get_event(event_name) if not evt: return for task in evt.create_tasks(*args, **kwargs): await self._tasks.put(task)
async def _consume(self) -> None: """ Consume the next event. """ # asyncio.queue::get() will wait until an item is available # without blocking the application task = await self._tasks.get() if not task: return try: await task except asyncio.CancelledError: pass except Exception as err: if "internal_error" not in self._events: return self._logger.info("Exception catched") self._logger.error(err) ievt = self._get_event("internal_error") if ievt: name = getattr(task, 'get_name', lambda: getattr(task, '__qualname__', str(task)))() err_tasks = ievt.create_tasks([err], name) await asyncio.gather(*err_tasks) finally: self._tasks.task_done()
[docs] async def stop(self) -> None: """ Stop the event loop. """ self._logger.info("Stopping event loop") self._stop = True # indicate producer is done await self._tasks.put(None) async with self._lock: pass # consume the last tasks while not self._tasks.empty(): await self._consume() self._logger.info("Event loop stopped")
[docs] async def start(self) -> None: """ Start the event loop. """ self._stop = False try: async with self._lock: self._logger.info("Starting event loop") while not self._stop: await self._consume() self._logger.info("Event loop completed") except asyncio.CancelledError: await self.stop()