"""
.. module:: session
:platform: Linux
:synopsis: session declaration
.. moduleauthor:: Andrea Cervesato <andrea.cervesato@suse.com>
"""
import asyncio
import copy
import logging
import os
import random
import re
from typing import (
Dict,
List,
Optional,
)
import libkirk
import libkirk.types
from libkirk.data import Suite
from libkirk.errors import (
KirkException,
SessionError,
)
from libkirk.export import JSONExporter
from libkirk.io import AsyncFile
from libkirk.ltp import LTPFramework
from libkirk.results import TestResults
from libkirk.scheduler import SuiteScheduler
from libkirk.sut import (
SUT,
RedirectSUTStdout,
)
from libkirk.tempfile import TempDir
[docs]
class Session:
"""
The session runner.
"""
def __init__(
self,
tmpdir: TempDir,
sut: SUT,
exec_timeout: float = 3600.0,
suite_timeout: float = 3600.0,
workers: int = 1,
force_parallel: bool = False,
) -> None:
"""
:param tmpdir: Temporary directory.
:type tmpdir: TempDir
:param sut: SUT communication object.
:type sut: SUT
:param exec_timeout: Test timeout.
:type exec_timeout: float
:param suite_timeout: Testing suite timeout.
:type suite_timeout: float
:param workers: Number of workers for testing suite scheduler.
:type workers: int
:param force_parallel: Force parallel execution of all tests.
:type force_parallel: bool
"""
self._logger = logging.getLogger("kirk.session")
self._tmpdir = tmpdir
self._sut = sut
self._exec_timeout = exec_timeout
self._force_parallel = force_parallel
self._stop = False
self._exec_lock = asyncio.Lock()
self._run_lock = asyncio.Lock()
self._results = []
self._framework = LTPFramework(
timeout=self._exec_timeout,
)
self._scheduler = SuiteScheduler(
sut=self._sut,
framework=self._framework,
suite_timeout=suite_timeout,
exec_timeout=self._exec_timeout,
max_workers=workers,
)
self._curr_suite = ""
self._setup_debug_log()
self._setup_test_save()
if not self._sut.get_channel().parallel_execution:
self._logger.info(
"SUT doesn't support parallel execution. Forcing workers=1."
)
def _setup_debug_log(self) -> None:
"""
Set logging module so we save a log file with debugging information
inside the temporary path.
"""
if not self._tmpdir.abspath:
return
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
debug_file = os.path.join(self._tmpdir.abspath, "debug.log")
handler = logging.FileHandler(debug_file, encoding="utf8")
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(name)s:%(lineno)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
def _setup_test_save(self) -> None:
"""
Setup event for complete test saving.
"""
if not self._tmpdir.abspath:
return
async def save_suite_started(suite: Suite) -> None:
self._curr_suite = suite.name
async def save_test_file(results: TestResults) -> None:
epath = os.path.join(self._tmpdir.abspath, "executed")
async with AsyncFile(epath, "a+") as efile:
await efile.write(f"{self._curr_suite}::{results.test.name}\n")
libkirk.events.register("suite_started", save_suite_started)
libkirk.events.register("test_completed", save_test_file)
async def _read_restored_session(self, path: str) -> Dict[str, List[str]]:
"""
Read restored session.
"""
data = {}
if not (path and os.path.exists(path)):
return data
epath = os.path.join(path, "executed")
if not os.path.exists(epath):
return data
self._logger.info("Reading previous executed tests")
async with AsyncFile(epath, "r") as efile:
async for line in efile:
if not line or "::" not in line:
continue
suite, test = line.split("::")
if not (suite and test):
continue
if suite not in data:
data[suite] = []
data[suite].append(test.rstrip())
self._logger.debug(data)
return data
async def _start_sut(self) -> None:
"""
Start communicating with SUT.
"""
await libkirk.events.fire("sut_start", self._sut.name)
await self._sut.start(iobuffer=RedirectSUTStdout(self._sut, False))
async def _stop_sut(self) -> None:
"""
Stop the SUT.
"""
if not await self._sut.is_running():
return
await libkirk.events.fire("sut_stop", self._sut.name)
await self._sut.stop(iobuffer=RedirectSUTStdout(self._sut, False))
async def _get_suites_objects(self, names: List[str]) -> List[Suite]:
"""
Return suites objects by giving their names.
"""
channel = self._sut.get_channel()
coros = [self._framework.find_suite(channel, suite) for suite in names]
if not coros:
raise KirkException(f"Can't find suites: {names}")
results = await asyncio.gather(*coros, return_exceptions=True)
for suite in results:
if isinstance(suite, Exception):
raise suite
if not suite:
raise KirkException("Can't find suite objects")
# pyrefly: ignore=no-matching-overload
return list(results)
async def _restore_tests(
self, suites_obj: list, restore_path: Optional[str] = None
) -> None:
"""
Remove all tests but the one which need to be restored.
"""
if not restore_path:
return
restored = await self._read_restored_session(restore_path)
if not restored:
return
await libkirk.events.fire("session_restore", restore_path)
for suite_obj in suites_obj:
suite = suite_obj.name
if suite not in restored:
continue
restored_set = set(restored[suite])
suite_obj.tests[:] = [
test for test in suite_obj.tests if test.name not in restored_set
]
@staticmethod
def _filter_tests(
suites_obj: List[Suite],
regex: Optional[str] = None,
when_matching: bool = False,
) -> None:
"""
Filter tests according to `regex`, if `when_matching` is True.
"""
if not regex:
return
matcher = re.compile(regex)
for suite_obj in suites_obj:
if when_matching:
# skip matching tests (keep non-matching)
suite_obj.tests[:] = [
test for test in suite_obj.tests if not matcher.search(test.name)
]
else:
# keep only matching tests
suite_obj.tests[:] = [
test for test in suite_obj.tests if matcher.search(test.name)
]
@staticmethod
def _apply_iterate(suites_obj: List[Suite], suite_iterate: int) -> List[Suite]:
"""
Return testing suites after applying iterate parameters.
"""
if suite_iterate <= 1:
return suites_obj
suites_list = []
for suite in suites_obj:
for i in range(suite_iterate):
obj = copy.deepcopy(suite)
obj.name = f"{suite.name}[{i}]"
suites_list.append(obj)
return suites_list
async def _read_suites(
self,
names: List[str],
pattern: Optional[str] = None,
skip_tests: Optional[str] = None,
restore_path: Optional[str] = None,
) -> List[Suite]:
"""
Read suites and return a list of Suite objects.
"""
suites_obj = await self._get_suites_objects(names)
await self._restore_tests(suites_obj, restore_path)
self._filter_tests(suites_obj, pattern, False)
self._filter_tests(suites_obj, skip_tests, True)
num_tests = sum(len(suite_obj.tests) for suite_obj in suites_obj)
if num_tests == 0:
raise KirkException("No tests selected")
if self._force_parallel:
for suite in suites_obj:
for test in suite.tests:
test.force_parallel()
return suites_obj
async def _exec_command(self, command: str) -> None:
"""
Execute a single command on SUT.
"""
async with self._exec_lock:
exc = None
try:
await libkirk.events.fire("run_cmd_start", command)
channel = self._sut.get_channel()
test = await self._framework.find_command(channel, command)
ret = await asyncio.wait_for(
channel.run_command(
test.full_command,
cwd=test.cwd,
env=test.env,
iobuffer=RedirectSUTStdout(self._sut, True),
),
timeout=self._exec_timeout,
)
if not ret:
raise SessionError(f"Can't execute command '{test.full_command}'")
await libkirk.events.fire(
"run_cmd_stop", command, ret["stdout"], ret["returncode"]
)
except asyncio.TimeoutError:
exc = KirkException(f"Command timeout: {repr(command)}")
except KirkException as err:
if not self._stop:
exc = err
if exc:
raise exc
async def _inner_stop(self) -> None:
"""
Stop scheduler and SUT.
"""
if self._scheduler:
await self._scheduler.stop()
await self._stop_sut()
[docs]
async def stop(self) -> None:
"""
Stop the current session.
"""
already_stopped = self._stop
self._stop = True
try:
await self._inner_stop()
async with self._run_lock:
pass
async with self._exec_lock:
pass
finally:
if not already_stopped:
await libkirk.events.fire("session_stopped")
self._stop = False
async def _schedule_once(self, suites_obj: List[Suite]) -> None:
"""
Schedule tests only once.
"""
await self._scheduler.schedule(suites_obj)
self._results.extend(self._scheduler.results)
async def _schedule_infinite(self, suites_obj: List[Suite]) -> None:
"""
Schedule all testing suites infinite times.
"""
count = 1
while not self._stop:
suites_iteration = []
for suite in copy.deepcopy(suites_obj):
suite.name = f"{suite.name}[{count}]"
suites_iteration.append(suite)
await self._schedule_once(suites_iteration)
if self._scheduler.stopped:
break
count += 1
async def _run_scheduler(self, suites_obj: List[Suite], runtime: float) -> None:
"""
Run the scheduler for specific amount of time given by `runtime`.
"""
if runtime <= 0:
await self._schedule_once(suites_obj)
return
try:
await asyncio.wait_for(self._schedule_infinite(suites_obj), runtime)
except asyncio.TimeoutError:
await self._scheduler.stop()
async def _apply_fault_injection(
self,
fault_prob: int,
fault_interval: int = 1,
) -> None:
"""
Check if we can apply fault injection configuration
and eventually does it.
"""
warn_msg = ""
if not await self._sut.logged_as_root():
if fault_prob != 0:
warn_msg = "Run as root to use kernel fault injection"
else:
if await self._sut.is_fault_injection_enabled():
await self._sut.setup_fault_injection(fault_prob, fault_interval)
else:
if fault_prob != 0:
warn_msg = "Fault injection is not enabled. Running tests normally"
if warn_msg:
self._logger.info(warn_msg)
await libkirk.events.fire("session_warning", warn_msg)
[docs]
async def run(
self,
command: Optional[str] = None,
suites: Optional[List[str]] = None,
pattern: Optional[str] = None,
skip_tests: Optional[str] = None,
report_path: Optional[str] = None,
restore_path: Optional[str] = None,
suite_iterate: int = 1,
randomize: bool = False,
runtime: float = 0,
fault_prob: int = 0,
fault_interval: int = 1,
) -> None:
"""
Run a new session and store results inside a JSON file.
:param command: Single command to run before suites.
:type command: str
:param suites: List of suites to execute.
:type suites: list
:param pattern: Regex pattern to include tests.
:type pattern: str
:param skip_tests: Regex for tests to skip.
:type skip_tests: str
:param report_path: JSON report path.
:type report_path: str
:param restore_path: Temporary directory generated by a previous session.
:type restore_path: str
:param suite_iterate: Execute all suites multiple times.
:type suite_iterate: int
:param randomize: Randomize all tests if True.
:type randomize: bool
:param runtime: For how long we want to run the session.
:type runtime: float
:param fault_prob: Fault injection probability.
:type fault_prob: int
:param fault_interval: Fault injection interval.
:type fault_interval: int
"""
async with self._run_lock:
await libkirk.events.fire("session_started", suites, self._tmpdir.abspath)
channel = self._sut.get_channel()
if not channel.parallel_execution:
await libkirk.events.fire(
"session_warning", "SUT doesn't support parallel execution"
)
try:
await self._start_sut()
if command:
await self._exec_command(command)
if fault_prob != 0:
await self._apply_fault_injection(fault_prob, fault_interval)
if suites:
suites_obj = await self._read_suites(
suites, pattern, skip_tests, restore_path
)
suites_obj = self._apply_iterate(suites_obj, suite_iterate)
if randomize:
for suite in suites_obj:
random.shuffle(suite.tests)
await self._run_scheduler(suites_obj, runtime)
except KirkException as err:
if not self._stop:
self._logger.exception(err)
await libkirk.events.fire("session_error", str(err))
raise err
finally:
try:
# configure fault injection to the original values
if fault_prob != 0:
await self._apply_fault_injection(0)
if self._results:
exporter = JSONExporter()
tasks = [
exporter.save_file(
self._results,
os.path.join(self._tmpdir.abspath, "results.json"),
)
]
if report_path:
tasks.append(exporter.save_file(self._results, report_path))
await asyncio.gather(*tasks)
except KirkException as err:
self._logger.exception(err)
await libkirk.events.fire("session_error", str(err))
raise err
finally:
self._results.clear()
await self._inner_stop()
await libkirk.events.fire(
"session_completed", self._scheduler.results
)