Source code for libkirk.io

"""
.. module:: io
    :platform: Linux
    :synopsis: module for handling I/O blocking operations

.. moduleauthor:: Andrea Cervesato <andrea.cervesato@suse.com>
"""

from types import TracebackType
from typing import (
    IO,
    Any,
    AsyncContextManager,
    Optional,
    Type,
    Union,
)

import libkirk


[docs] class AsyncFile(AsyncContextManager): """ Handle files in asynchronous way by running operations inside a separate thread. """ def __init__(self, filename: str, mode: str = "r") -> None: """ :param filename: Location of the file to open. :type filename: str :param mode: Mode used to open the file. :type mode: str """ self._filename = filename self._mode = mode self._file = None # Optimize: cache mode checks to avoid repeated string operations self._is_read_mode = "r" in mode self._is_binary_mode = "b" in mode async def __aenter__(self) -> Any: await self.open() return self async def __aexit__( self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> bool: await self.close() return True def __aiter__(self) -> Any: return self async def __anext__(self) -> Optional[Union[str, bytes]]: if not self._is_read_mode: raise ValueError("File must be open in read mode") if not self._file: return None line = await libkirk.to_thread(self._file.readline) if not line: raise StopAsyncIteration() return line
[docs] async def open(self) -> None: """ Open the file according to the mode. """ if self._file: return def _open() -> IO[Any]: if self._is_binary_mode: return open(self._filename, self._mode) return open(self._filename, self._mode, encoding="utf-8") self._file = await libkirk.to_thread(_open)
[docs] async def close(self) -> None: """ Close the file. """ if not self._file: return await libkirk.to_thread(self._file.close) self._file = None
[docs] async def seek(self, pos: int) -> None: """ Asynchronous version of `seek()`. :param pos: Position to search. :type pos: int """ if not self._file: return await libkirk.to_thread(self._file.seek, pos)
[docs] async def tell(self) -> Optional[int]: """ Asynchronous version of `tell()`. :return: Current file position or None if file is not open. :rtype: int | None """ if not self._file: return None return await libkirk.to_thread(self._file.tell)
[docs] async def read(self, size: int = -1) -> Optional[Union[str, bytes]]: """ Asynchronous version of `read()`. :param size: Amount of data to read. Default is -1, that means all data available. :type size: int :returns: Data that has been read or None if file is not open. :rtype: str | bytes | None """ if not self._file: return None return await libkirk.to_thread(self._file.read, size)
[docs] async def readline(self) -> Optional[Union[str, bytes]]: """ Asynchronous version of `readline()`. :returns: Data that has been read or None if file is not open. :rtype: str | bytes | None """ if not self._file: return None return await libkirk.to_thread(self._file.readline)
[docs] async def write(self, data: Union[str, bytes]) -> None: """ Asynchronous version of `write()`. :param data: Data to write inside file. :type data: str | bytes """ if not self._file: return await libkirk.to_thread(self._file.write, data)