Source code for libkirk

"""
.. module:: __init__
    :platform: Linux
    :synopsis: application package definition

.. moduleauthor:: Andrea Cervesato <andrea.cervesato@suse.com>
"""

import asyncio
import sys
import typing
from typing import Callable

from libkirk.evt import EventsHandler

# Kirk version
__version__ = "4.1.0"


events = EventsHandler()


[docs] def get_event_loop() -> asyncio.AbstractEventLoop: """ Return the current asyncio event loop. """ try: return asyncio.get_running_loop() except (AttributeError, RuntimeError): pass try: return asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop
def create_task(coro: typing.Coroutine) -> asyncio.Task: """ Create a new task. """ return get_event_loop().create_task(coro) def all_tasks(loop: asyncio.AbstractEventLoop) -> set: """ Return the list of all running tasks for the specific loop. """ # Maintain Python 3.6 compatibility if sys.version_info >= (3, 7): return asyncio.all_tasks(loop=loop) else: return asyncio.Task.all_tasks(loop=loop) def cancel_tasks(loop: asyncio.AbstractEventLoop) -> None: """ Cancel all asyncio running tasks. """ tasks = all_tasks(loop) if not tasks: return tasks_to_cancel = [] for task in tasks: if not (task.cancelled() or task.done()): task.cancel() tasks_to_cancel.append(task) # Maintain Python 3.6 compatibility if sys.version_info >= (3, 10): # pyrefly: ignore[bad-argument-type] loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) else: loop.run_until_complete( # pyrefly: ignore[bad-argument-type] asyncio.gather(*tasks, loop=loop, return_exceptions=True) ) for task in tasks_to_cancel: if not task.cancelled() and task.exception() is not None: loop.call_exception_handler( { "message": "unhandled exception during asyncio.run() shutdown", "exception": task.exception(), "task": task, } ) def to_thread(callback: Callable, *args: typing.Any) -> typing.Any: """ Run callback inside a thread. This is useful for blocking I/O operations. """ return get_event_loop().run_in_executor(None, callback, *args) __all__ = [ "events", "get_event_loop", ]