"""
.. module:: com
:platform: Linux
:synopsis: communication class definition
.. moduleauthor:: Andrea Cervesato <andrea.cervesato@suse.com>
"""
from typing import (
Any,
Dict,
List,
Optional,
)
import libkirk.plugin
from libkirk.errors import (
KirkException,
PluginError,
)
from libkirk.plugin import Plugin
# discovered communication channels
_COM = []
[docs]
class IOBuffer:
"""
IO stdout buffer. The API is similar to IO types.
"""
[docs]
async def write(self, data: str) -> None:
"""
Write data inside the buffer.
:param data: Data to write.
:type data: str
"""
raise NotImplementedError()
[docs]
class ComChannel(Plugin):
"""
Communication channel. The objects implementing this class are usually
using SSH, serial, shell, etc protocols. and they are used by the scheduler
in order to execute commands or turning on/off the communication.
"""
@property
def parallel_execution(self) -> bool:
"""
:return: If True, communication supports commands parallel execution.
:rtype: bool
"""
raise NotImplementedError()
[docs]
async def active(self) -> bool:
"""
:return: Return True if communication is active. False otherwise.
:rtype: bool
"""
raise NotImplementedError()
[docs]
async def communicate(self, iobuffer: Optional[IOBuffer] = None) -> None:
"""
Start communication.
:param iobuffer: Buffer used to write stdout.
:type iobuffer: IOBuffer
"""
raise NotImplementedError()
[docs]
async def stop(self, iobuffer: Optional[IOBuffer] = None) -> None:
"""
Stop communication.
:param iobuffer: Buffer used to write stdout.
:type iobuffer: IOBuffer
"""
raise NotImplementedError()
[docs]
async def ping(self) -> float:
"""
Send a ping request and verify how much reply takes in seconds.
:return: Time between ping and pong.
:rtype: float
"""
raise NotImplementedError()
[docs]
async def run_command(
self,
command: str,
cwd: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
iobuffer: Optional[IOBuffer] = None,
) -> Optional[Dict[str, Any]]:
"""
Run a command.
:param command: Command to execute.
:type command: str
:param cwd: Current working directory.
:type cwd: str
:param env: Environment variables.
:type env: dict
:param iobuffer: Buffer used to write stdout.
:type iobuffer: IOBuffer
:return: Dictionary containing information about the executed command.
.. code-block:: python
{
"command": <str>,
"returncode": <int>,
"stdout": <str>,
"exec_time": <float>,
}
If None is returned, then callback has failed.
:rtype: dict
"""
raise NotImplementedError()
[docs]
async def fetch_file(self, target_path: str) -> bytes:
"""
Fetch file and return its content.
:param target_path: Path of the file to download from target.
:type target_path: str
:return: Data contained in target_path.
:rtype: bytes
"""
raise NotImplementedError()
[docs]
async def ensure_communicate(
self, iobuffer: Optional[IOBuffer] = None, retries: int = 10
) -> None:
"""
Ensure that communicate is completed, retrying as many times we
want in case of KirkException error. After each error, the
communication is stopped and a new communication is performed.
:param iobuffer: Buffer used to write stdout.
:type iobuffer: IOBuffer
:param retries: Number of times we retry to communicate.
:type retries: int
"""
retries = max(retries, 1)
for retry in range(retries):
try:
await self.communicate(iobuffer=iobuffer)
break
except KirkException as err:
if retry >= retries - 1:
raise err
await self.stop(iobuffer=iobuffer)
[docs]
def discover(path: str, extend: bool = True) -> None:
"""
Discover all ComChannel implementations inside `path`.
:param path: Directory where searching for channel implementations.
:type path: str
:param extend: If True, it will add new discovered channels on top of the
ones already found. If False, previous discovered channels will be
cleared.
:rtype: bool
"""
global _COM
obj = libkirk.plugin.discover(ComChannel, path)
if not extend:
_COM.clear()
_COM.extend(obj)
[docs]
def get_channels() -> List[ComChannel]:
"""
:return: List of loaded ComChannel implementations.
:rtype: list(ComChannel)
"""
global _COM
# pyrefly: ignore[bad-return]
return _COM
[docs]
def clone_channel(name: str, new_name: str) -> Plugin:
"""
Clone a channel implementation named name and rename it with
new_name. The new plugin will be registered with the other
plugins.
:param name: Plugin name.
:type name: str
:param new_name: New cloned plugin name.
:type new_name: str
:return: New plugin object.
:rtype: Plugin
"""
global _COM
plugin = next((c for c in _COM if c.name == name), None)
if not plugin:
raise PluginError(f"Can't find plugin '{name}'")
channel = plugin.clone(new_name)
_COM.append(channel)
return channel