Source code for libkirk.sut

"""
.. module:: sut
    :platform: Linux
    :synopsis: module implementing SUT

.. moduleauthor:: Andrea Cervesato <andrea.cervesato@suse.com>
"""

import asyncio
import re
from typing import (
    Dict,
    List,
    Optional,
    Tuple,
)

import libkirk
import libkirk.plugin
from libkirk.com import (
    ComChannel,
    IOBuffer,
)
from libkirk.data import Test
from libkirk.errors import SUTError
from libkirk.plugin import Plugin

# discovered SUT implementations
_SUT = []


[docs] class SUT(Plugin): """ SUT abstraction class. It could be a remote host, a local host, a virtual machine instance, or any complex system we want to test. """ TAINTED_MSG = [ "proprietary module was loaded", "module was force loaded", "kernel running on an out of specification system", "module was force unloaded", "processor reported a Machine Check Exception (MCE)", "bad page referenced or some unexpected page flags", "taint requested by userspace application", "kernel died recently, i.e. there was an OOPS or BUG", "ACPI table overridden by user", "kernel issued warning", "staging driver was loaded", "workaround for bug in platform firmware applied", "externally-built (“out-of-tree”) module was loaded", "unsigned module was loaded", "soft lockup occurred", "kernel has been live patched", "auxiliary taint, defined for and used by distros", "kernel was built with the struct randomization plugin", ] FAULT_INJECTION_FILES = [ "fail_io_timeout", "fail_make_request", "fail_page_alloc", "failslab", ] _optimize = False
[docs] def get_channel(self) -> ComChannel: """ :return: Main channel to communicated with SUT. :rtype: ComChannel """ raise NotImplementedError()
[docs] async def start(self, iobuffer: Optional[IOBuffer] = None) -> None: """ Start the SUT. :param iobuffer: IO channel where to write stdout. :type iobuffer: IOBuffer """ raise NotImplementedError()
[docs] async def stop(self, iobuffer: Optional[IOBuffer] = None) -> None: """ Stop the SUT. :param iobuffer: IO channel where to write stdout. :type iobuffer: IOBuffer """ raise NotImplementedError()
[docs] async def restart(self, iobuffer: Optional[IOBuffer] = None) -> None: """ Restart the SUT. :param iobuffer: IO channel where to write stdout. :type iobuffer: IOBuffer """ raise NotImplementedError()
[docs] async def is_running(self) -> bool: """ :return: True if system under test is up and running. False otherwise. :rtype: bool """ raise NotImplementedError()
@property def optimize(self) -> bool: """ Optimize the commands execution by applying parallelization when it's available. :return: True if SUT will optimize commands execution on SUT. """ return self._optimize @optimize.setter def optimize(self, value: bool) -> None: """ Set commands execution optimization. """ self._optimize = value async def _run_cmd(self, cmd: str) -> str: """ Run command, check for returncode and return command's stdout. """ stdout = "unknown" try: channel = self.get_channel() ret = await asyncio.wait_for(channel.run_command(cmd), 1.5) if ret and ret["returncode"] == 0: stdout = ret["stdout"].rstrip() except asyncio.TimeoutError: pass return stdout async def _get_distro(self) -> str: """ Return the distro name. """ return await self._run_cmd('. /etc/os-release && echo "$ID"') async def _get_distro_ver(self) -> str: """ Return the distro version. """ return await self._run_cmd('. /etc/os-release && echo "$VERSION_ID"') async def _get_kernel(self) -> str: """ Return the kernel name. """ return await self._run_cmd("uname -s -r -v") async def _get_cmdline(self) -> str: """ Return /proc/cmdline content. """ return await self._run_cmd("cat /proc/cmdline") async def _get_arch(self) -> str: """ Return the architecture name. """ return await self._run_cmd("uname -m") async def _get_cpu(self) -> str: """ Return the CPU name. """ return await self._run_cmd("uname -p") async def _get_meminfo(self) -> str: """ Return the memory information. """ return await self._run_cmd("cat /proc/meminfo")
[docs] async def get_info(self) -> Dict[str, str]: """ Return SUT information. :return: Dictionary containing the SUT information in the form of: .. code-block:: python { "distro": str, "distro_ver": str, "kernel": str, "cmdline": str, "arch": str, "cpu" : str, "swap" : str, "ram" : str, } :rtype: dict """ if not await self.is_running(): raise SUTError("SUT is not running") distro = "" distro_ver = "" kernel = "" cmdline = "" arch = "" cpu = "" meminfo = "" if self.optimize: # pyrefly: ignore[bad-unpacking] ( distro, distro_ver, kernel, cmdline, arch, cpu, meminfo, ) = await asyncio.gather( *[ self._get_distro(), self._get_distro_ver(), self._get_kernel(), self._get_cmdline(), self._get_arch(), self._get_cpu(), self._get_meminfo(), ] ) else: distro = await self._get_distro() distro_ver = await self._get_distro_ver() kernel = await self._get_kernel() cmdline = await self._get_cmdline() arch = await self._get_arch() cpu = await self._get_cpu() meminfo = await self._get_meminfo() memory = "unknown" swap = "unknown" if meminfo: mem_m = re.search(r"MemTotal:\s+(?P<memory>\d+\s+kB)", meminfo) if mem_m: memory = mem_m.group("memory") swap_m = re.search(r"SwapTotal:\s+(?P<swap>\d+\s+kB)", meminfo) if swap_m: swap = swap_m.group("swap") ret = { "distro": distro, "distro_ver": distro_ver, "kernel": kernel, "cmdline": cmdline, "arch": arch, "cpu": cpu, "ram": memory, "swap": swap, } return ret
# Lazily initialised inside get_tainted_info() so that the # asyncio primitives are always created on the *running* loop. # Class-level assignment would bind them to the loop that was # current at import time, which breaks on Python 3.7-3.9 when # tests run on a different loop than the session loop. _tainted_lock: Optional[asyncio.Lock] = None _tainted_status: Optional[asyncio.Queue] = None
[docs] async def get_tainted_info(self) -> Tuple[int, List[str]]: """ Return information about kernel if tainted. :return: A tuple containing tainted information. First element is the tainted code and the second element is the tainted message. :rtype: (int, list[str]) """ if not await self.is_running(): raise SUTError("SUT is not running") # Initialise on first use, always inside a running loop. if self._tainted_lock is None: self._tainted_lock = asyncio.Lock() if self._tainted_status is None: self._tainted_status = asyncio.Queue(maxsize=1) if self._tainted_lock.locked(): try: status = self._tainted_status.get_nowait() return status except asyncio.QueueEmpty: pass async with self._tainted_lock: channel = self.get_channel() ret = await channel.run_command("cat /proc/sys/kernel/tainted") if not ret or ret["returncode"] != 0: raise SUTError("Can't read tainted kernel information") tainted_num = len(self.TAINTED_MSG) code = ret["stdout"].strip() # output is likely message in stderr if not code.isdigit(): raise SUTError(code) code = int(code) bits = format(code, f"0{tainted_num}b")[::-1] messages = [] for i in range(0, tainted_num): if bits[i] == "1": msg = self.TAINTED_MSG[i] messages.append(msg) try: self._tainted_status.get_nowait() except asyncio.QueueEmpty: pass await self._tainted_status.put((code, messages)) return code, messages
[docs] async def logged_as_root(self) -> bool: """ :return: True if we are logged as root inside the SUT. False otherwise. :rtype: bool """ if not await self.is_running(): raise SUTError("SUT is not running") channel = self.get_channel() ret = await channel.run_command("id -u") if not ret or ret["returncode"] != 0: raise SUTError("Can't determine if we are running as root") val = ret["stdout"].rstrip() user_id = 100 try: user_id = int(val) except ValueError as err: raise SUTError(f"'id -u' returned {val}") from err return user_id == 0
[docs] async def is_fault_injection_enabled(self) -> bool: """ :return: True if fault injection is enabled in the kernel. False otherwise. :rtype: bool """ if not await self.is_running(): raise SUTError("SUT is not running") channel = self.get_channel() for ftype in self.FAULT_INJECTION_FILES: ret = await channel.run_command(f"test -d /sys/kernel/debug/{ftype}") if ret and ret["returncode"] != 0: return False return True
[docs] async def setup_fault_injection( self, prob: int, interval: int = 1, ) -> None: """ Configure kernel fault injection. When prob is zero, the fault injection is set to default values. :param prob: Fault probability in between 0-100. :type prob: int :param interval: Fault interval. :type interval: int """ if not await self.is_running(): raise SUTError("SUT is not running") interval = 1 if prob == 0 else interval times = 1 if prob == 0 else -1 async def _set_value(value: int, path: str) -> None: """ Set the value to the path """ channel = self.get_channel() ret = await channel.run_command(f"echo {value} > {path}") if ret and ret["returncode"] != 0: raise SUTError(f"Can't setup {path}. {ret['stdout']}") for ftype in self.FAULT_INJECTION_FILES: path = f"/sys/kernel/debug/{ftype}" await _set_value(0, f"{path}/space") await _set_value(times, f"{path}/times") await _set_value(interval, f"{path}/interval") await _set_value(prob, f"{path}/probability")
[docs] class RedirectTestStdout(IOBuffer): """ Redirect test stdout data to UI events and save it. """ def __init__(self, test: Test) -> None: self.stdout = "" self._test = test
[docs] async def write(self, data: str) -> None: await libkirk.events.fire("test_stdout", self._test, data) self.stdout += data
[docs] class RedirectSUTStdout(IOBuffer): """ Redirect SUT stdout data to UI events. """ def __init__(self, sut: SUT, is_cmd: bool = False) -> None: self._sut = sut self._is_cmd = is_cmd
[docs] async def write(self, data: str) -> None: if self._is_cmd: await libkirk.events.fire("run_cmd_stdout", data) else: await libkirk.events.fire("sut_stdout", self._sut.name, data)
[docs] def discover(path: str, extend: bool = True) -> None: """ Discover all SUT implementations inside path. :param path: Directory where searching for SUT implementations. :type path: str :param extend: If True, it will add new discovered SUT on top of the ones already found. If False, previous discovered SUT will be cleared. :type param: bool """ global _SUT obj = libkirk.plugin.discover(SUT, path) if not extend: _SUT.clear() _SUT.extend(obj)
[docs] def get_suts() -> List[SUT]: """ :return: List of loaded SUT implementations. :rtype: list(SUT) """ global _SUT return _SUT