Source code for libkirk.session

"""
.. module:: session
    :platform: Linux
    :synopsis: session declaration

.. moduleauthor:: Andrea Cervesato <andrea.cervesato@suse.com>
"""

import asyncio
import copy
import logging
import os
import random
import re
from typing import (
    Dict,
    List,
    Optional,
)

import libkirk
import libkirk.types
from libkirk.data import Suite
from libkirk.errors import (
    KirkException,
    SessionError,
)
from libkirk.export import JSONExporter
from libkirk.io import AsyncFile
from libkirk.ltp import LTPFramework
from libkirk.results import TestResults
from libkirk.scheduler import SuiteScheduler
from libkirk.sut import (
    SUT,
    RedirectSUTStdout,
)
from libkirk.tempfile import TempDir


[docs] class Session: """ The session runner. """ def __init__( self, tmpdir: TempDir, sut: SUT, exec_timeout: float = 3600.0, suite_timeout: float = 3600.0, workers: int = 1, force_parallel: bool = False, ) -> None: """ :param tmpdir: Temporary directory. :type tmpdir: TempDir :param sut: SUT communication object. :type sut: SUT :param exec_timeout: Test timeout. :type exec_timeout: float :param suite_timeout: Testing suite timeout. :type suite_timeout: float :param workers: Number of workers for testing suite scheduler. :type workers: int :param force_parallel: Force parallel execution of all tests. :type force_parallel: bool """ self._logger = logging.getLogger("kirk.session") self._tmpdir = tmpdir self._sut = sut self._exec_timeout = exec_timeout self._force_parallel = force_parallel self._stop = False self._exec_lock = asyncio.Lock() self._run_lock = asyncio.Lock() self._results = [] self._framework = LTPFramework( timeout=self._exec_timeout, ) self._scheduler = SuiteScheduler( sut=self._sut, framework=self._framework, suite_timeout=suite_timeout, exec_timeout=self._exec_timeout, max_workers=workers, ) self._curr_suite = "" self._setup_debug_log() self._setup_test_save() if not self._sut.get_channel().parallel_execution: self._logger.info( "SUT doesn't support parallel execution. Forcing workers=1." ) def _setup_debug_log(self) -> None: """ Set logging module so we save a log file with debugging information inside the temporary path. """ if not self._tmpdir.abspath: return logger = logging.getLogger() logger.setLevel(logging.DEBUG) debug_file = os.path.join(self._tmpdir.abspath, "debug.log") handler = logging.FileHandler(debug_file, encoding="utf8") handler.setLevel(logging.DEBUG) formatter = logging.Formatter( "%(asctime)s - %(name)s:%(lineno)s - %(levelname)s - %(message)s" ) handler.setFormatter(formatter) logger.addHandler(handler) def _setup_test_save(self) -> None: """ Setup event for complete test saving. """ if not self._tmpdir.abspath: return async def save_suite_started(suite: Suite) -> None: self._curr_suite = suite.name async def save_test_file(results: TestResults) -> None: epath = os.path.join(self._tmpdir.abspath, "executed") async with AsyncFile(epath, "a+") as efile: await efile.write(f"{self._curr_suite}::{results.test.name}\n") libkirk.events.register("suite_started", save_suite_started) libkirk.events.register("test_completed", save_test_file) async def _read_restored_session(self, path: str) -> Dict[str, List[str]]: """ Read restored session. """ data = {} if not (path and os.path.exists(path)): return data epath = os.path.join(path, "executed") if not os.path.exists(epath): return data self._logger.info("Reading previous executed tests") async with AsyncFile(epath, "r") as efile: async for line in efile: if not line or "::" not in line: continue suite, test = line.split("::") if not (suite and test): continue if suite not in data: data[suite] = [] data[suite].append(test.rstrip()) self._logger.debug(data) return data async def _start_sut(self) -> None: """ Start communicating with SUT. """ await libkirk.events.fire("sut_start", self._sut.name) await self._sut.start(iobuffer=RedirectSUTStdout(self._sut, False)) async def _stop_sut(self) -> None: """ Stop the SUT. """ if not await self._sut.is_running(): return await libkirk.events.fire("sut_stop", self._sut.name) await self._sut.stop(iobuffer=RedirectSUTStdout(self._sut, False)) async def _get_suites_objects(self, names: List[str]) -> List[Suite]: """ Return suites objects by giving their names. """ channel = self._sut.get_channel() coros = [self._framework.find_suite(channel, suite) for suite in names] if not coros: raise KirkException(f"Can't find suites: {names}") results = await asyncio.gather(*coros, return_exceptions=True) for suite in results: if isinstance(suite, Exception): raise suite if not suite: raise KirkException("Can't find suite objects") # pyrefly: ignore=no-matching-overload return list(results) async def _restore_tests( self, suites_obj: list, restore_path: Optional[str] = None ) -> None: """ Remove all tests but the one which need to be restored. """ if not restore_path: return restored = await self._read_restored_session(restore_path) if not restored: return await libkirk.events.fire("session_restore", restore_path) for suite_obj in suites_obj: suite = suite_obj.name if suite not in restored: continue restored_set = set(restored[suite]) suite_obj.tests[:] = [ test for test in suite_obj.tests if test.name not in restored_set ] @staticmethod def _filter_tests( suites_obj: List[Suite], regex: Optional[str] = None, when_matching: bool = False, ) -> None: """ Filter tests according to `regex`, if `when_matching` is True. """ if not regex: return matcher = re.compile(regex) for suite_obj in suites_obj: if when_matching: # skip matching tests (keep non-matching) suite_obj.tests[:] = [ test for test in suite_obj.tests if not matcher.search(test.name) ] else: # keep only matching tests suite_obj.tests[:] = [ test for test in suite_obj.tests if matcher.search(test.name) ] @staticmethod def _apply_iterate(suites_obj: List[Suite], suite_iterate: int) -> List[Suite]: """ Return testing suites after applying iterate parameters. """ if suite_iterate <= 1: return suites_obj suites_list = [] for suite in suites_obj: for i in range(suite_iterate): obj = copy.deepcopy(suite) obj.name = f"{suite.name}[{i}]" suites_list.append(obj) return suites_list async def _read_suites( self, names: List[str], pattern: Optional[str] = None, skip_tests: Optional[str] = None, restore_path: Optional[str] = None, ) -> List[Suite]: """ Read suites and return a list of Suite objects. """ suites_obj = await self._get_suites_objects(names) await self._restore_tests(suites_obj, restore_path) self._filter_tests(suites_obj, pattern, False) self._filter_tests(suites_obj, skip_tests, True) num_tests = sum(len(suite_obj.tests) for suite_obj in suites_obj) if num_tests == 0: raise KirkException("No tests selected") if self._force_parallel: for suite in suites_obj: for test in suite.tests: test.force_parallel() return suites_obj async def _exec_command(self, command: str) -> None: """ Execute a single command on SUT. """ async with self._exec_lock: exc = None try: await libkirk.events.fire("run_cmd_start", command) channel = self._sut.get_channel() test = await self._framework.find_command(channel, command) ret = await asyncio.wait_for( channel.run_command( test.full_command, cwd=test.cwd, env=test.env, iobuffer=RedirectSUTStdout(self._sut, True), ), timeout=self._exec_timeout, ) if not ret: raise SessionError(f"Can't execute command '{test.full_command}'") await libkirk.events.fire( "run_cmd_stop", command, ret["stdout"], ret["returncode"] ) except asyncio.TimeoutError: exc = KirkException(f"Command timeout: {repr(command)}") except KirkException as err: if not self._stop: exc = err if exc: raise exc async def _inner_stop(self) -> None: """ Stop scheduler and SUT. """ if self._scheduler: await self._scheduler.stop() await self._stop_sut()
[docs] async def stop(self) -> None: """ Stop the current session. """ already_stopped = self._stop self._stop = True try: await self._inner_stop() async with self._run_lock: pass async with self._exec_lock: pass finally: if not already_stopped: await libkirk.events.fire("session_stopped") self._stop = False
async def _schedule_once(self, suites_obj: List[Suite]) -> None: """ Schedule tests only once. """ await self._scheduler.schedule(suites_obj) self._results.extend(self._scheduler.results) async def _schedule_infinite(self, suites_obj: List[Suite]) -> None: """ Schedule all testing suites infinite times. """ count = 1 while not self._stop: suites_iteration = [] for suite in copy.deepcopy(suites_obj): suite.name = f"{suite.name}[{count}]" suites_iteration.append(suite) await self._schedule_once(suites_iteration) if self._scheduler.stopped: break count += 1 async def _run_scheduler(self, suites_obj: List[Suite], runtime: float) -> None: """ Run the scheduler for specific amount of time given by `runtime`. """ if runtime <= 0: await self._schedule_once(suites_obj) return try: await asyncio.wait_for(self._schedule_infinite(suites_obj), runtime) except asyncio.TimeoutError: await self._scheduler.stop() async def _apply_fault_injection( self, fault_prob: int, fault_interval: int = 1, ) -> None: """ Check if we can apply fault injection configuration and eventually does it. """ warn_msg = "" if not await self._sut.logged_as_root(): if fault_prob != 0: warn_msg = "Run as root to use kernel fault injection" else: if await self._sut.is_fault_injection_enabled(): await self._sut.setup_fault_injection(fault_prob, fault_interval) else: if fault_prob != 0: warn_msg = "Fault injection is not enabled. Running tests normally" if warn_msg: self._logger.info(warn_msg) await libkirk.events.fire("session_warning", warn_msg)
[docs] async def run( self, command: Optional[str] = None, suites: Optional[List[str]] = None, pattern: Optional[str] = None, skip_tests: Optional[str] = None, report_path: Optional[str] = None, restore_path: Optional[str] = None, suite_iterate: int = 1, randomize: bool = False, runtime: float = 0, fault_prob: int = 0, fault_interval: int = 1, ) -> None: """ Run a new session and store results inside a JSON file. :param command: Single command to run before suites. :type command: str :param suites: List of suites to execute. :type suites: list :param pattern: Regex pattern to include tests. :type pattern: str :param skip_tests: Regex for tests to skip. :type skip_tests: str :param report_path: JSON report path. :type report_path: str :param restore_path: Temporary directory generated by a previous session. :type restore_path: str :param suite_iterate: Execute all suites multiple times. :type suite_iterate: int :param randomize: Randomize all tests if True. :type randomize: bool :param runtime: For how long we want to run the session. :type runtime: float :param fault_prob: Fault injection probability. :type fault_prob: int :param fault_interval: Fault injection interval. :type fault_interval: int """ async with self._run_lock: await libkirk.events.fire("session_started", suites, self._tmpdir.abspath) channel = self._sut.get_channel() if not channel.parallel_execution: await libkirk.events.fire( "session_warning", "SUT doesn't support parallel execution" ) try: await self._start_sut() if command: await self._exec_command(command) if fault_prob != 0: await self._apply_fault_injection(fault_prob, fault_interval) if suites: suites_obj = await self._read_suites( suites, pattern, skip_tests, restore_path ) suites_obj = self._apply_iterate(suites_obj, suite_iterate) if randomize: for suite in suites_obj: random.shuffle(suite.tests) await self._run_scheduler(suites_obj, runtime) except KirkException as err: if not self._stop: self._logger.exception(err) await libkirk.events.fire("session_error", str(err)) raise err finally: try: # configure fault injection to the original values if fault_prob != 0: await self._apply_fault_injection(0) if self._results: exporter = JSONExporter() tasks = [ exporter.save_file( self._results, os.path.join(self._tmpdir.abspath, "results.json"), ) ] if report_path: tasks.append(exporter.save_file(self._results, report_path)) await asyncio.gather(*tasks) except KirkException as err: self._logger.exception(err) await libkirk.events.fire("session_error", str(err)) raise err finally: self._results.clear() await self._inner_stop() await libkirk.events.fire( "session_completed", self._scheduler.results )