Source code for libkirk.scheduler

"""
.. module:: runner
    :platform: Linux
    :synopsis: module containing Runner definition and implementation.

.. moduleauthor:: Andrea Cervesato <andrea.cervesato@suse.com>
"""

import asyncio
import enum
import logging
import os
import signal
import sys
import time
from typing import (
    Any,
    Dict,
    List,
    Optional,
)

import libkirk
import libkirk.data
from libkirk.data import (
    Suite,
    Test,
)
from libkirk.errors import (
    KernelPanicError,
    KernelTaintedError,
    KernelTimeoutError,
    KirkException,
    SchedulerError,
)
from libkirk.framework import Framework
from libkirk.results import (
    Results,
    ResultStatus,
    SuiteResults,
    TestResults,
)
from libkirk.sut import (
    SUT,
    RedirectSUTStdout,
    RedirectTestStdout,
)


[docs] class Scheduler: """ Schedule jobs to run on target. """ @property def results(self) -> List[Results]: """ Current results. It's reset before every `schedule` call and it's populated when a job completes the execution. :return: List of results. :rtype: list(Results) """ raise NotImplementedError() @property def stopped(self) -> bool: """ :return: True when scheduler has been stopped. False otherwise. :rtype: bool """ raise NotImplementedError()
[docs] async def stop(self) -> None: """ Stop all running jobs. """ raise NotImplementedError()
[docs] async def schedule(self, jobs: List[Any]) -> None: """ Schedule and execute a list of jobs. :param jobs: Object containing jobs definition :type jobs: list(object) """ raise NotImplementedError()
[docs] class TestStatus(enum.IntEnum): """ Status codes returned by test execution in the scheduler. """ OK = 0 TEST_TIMEOUT = 1 KERNEL_PANIC = 2 KERNEL_TAINTED = 3 KERNEL_TIMEOUT = 4
[docs] class TestScheduler(Scheduler): """ Schedule and run tests, taking into account status of the kernel during their execution, as well as tests timeout. """ def __init__( self, sut: SUT, framework: Framework, timeout: float = 0.0, max_workers: int = 1 ) -> None: """ :param sut: Object to communicate with SUT. :type sut: SUT :param framework: Framework handler. :type framework: Framework :param timeout: Timeout for tests execution. :type timeout: float :param max_workers: Maximum number of workers to schedule jobs. :type max_workers: int """ if not sut: raise ValueError("SUT object is empty") if not framework: raise ValueError("Framework object is empty") self._logger = logging.getLogger("kirk.test_scheduler") self._sut = sut self._framework = framework self._timeout = 0.0 if timeout < 0.0 else timeout self._max_workers = 1 if max_workers < 1 else max_workers self._results = [] self._stop_cnt = 0 self._stopped = False self._running_tests_sem = asyncio.Semaphore(1) self._schedule_lock = asyncio.Lock() async def _get_tainted_status(self) -> tuple: """ Check tainted status of the Kernel. """ code, messages = await self._sut.get_tainted_info() for msg in messages: if msg: self._logger.debug("Kernel tainted (%d): %s", code, msg) await libkirk.events.fire("kernel_tainted", msg) return code, messages async def _write_kmsg( self, test: Test, results: Optional[TestResults] = None ) -> None: """ If root, we write test information on /dev/kmsg. """ self._logger.info("Writing test information on /dev/kmsg") channel = self._sut.get_channel() ret = await channel.run_command("id -u") if not ret or ret["stdout"] != "0\n": self._logger.info("Can't write on /dev/kmsg from user") return if results: message = ( f"{sys.argv[0]}[{os.getpid()}]: " f"{test.name}: end (returncode: {results.return_code})\n" ) else: message = ( f"{sys.argv[0]}[{os.getpid()}]: " f"{test.name}: start (command: {test.full_command})\n" ) await channel.run_command(f'echo -n "{message}" > /dev/kmsg') @property def results(self) -> List[Results]: return self._results @property def stopped(self) -> bool: return self._stopped
[docs] async def stop(self) -> None: self._logger.info("Stopping tests execution") self._stop_cnt += 1 if self._stop_cnt > 1: # by stopping SUT first, we cause scheduler to complete # current test immediatelly without waiting await self._sut.stop() try: # we enter in the semaphore queue in order to get highest # priority in the tests queue and wait for the running tests # to be completed async with self._running_tests_sem: pass self._logger.info("Wait for running tests to be completed") async with self._schedule_lock: pass finally: self._stop_cnt = 0 self._stopped = True self._logger.info("All tests have been completed")
async def _run_test(self, test: Test) -> None: """ Run a single test and populate the results array. """ async with self._running_tests_sem: if self._stop_cnt > 0: self._logger.info("Test '%s' has been stopped", test.name) return self._logger.info("Running test %s", test.name) self._logger.debug(test) await libkirk.events.fire("test_started", test) await self._write_kmsg(test, None) iobuffer = RedirectTestStdout(test) cmd = test.full_command start_t = time.time() exec_time = 0 test_data: Dict[str, Any] = {} tainted_msg = None status = TestStatus.OK channel = self._sut.get_channel() try: tainted_code1, _ = await self._get_tainted_status() # pyrefly: ignore[bad-assignment] test_data = await asyncio.wait_for( channel.run_command( cmd, cwd=test.cwd, env=test.env, iobuffer=iobuffer ), timeout=self._timeout, ) if test_data is None: raise SchedulerError("Test command return None") tainted_code2, tainted_msg2 = await self._get_tainted_status() if tainted_code2 != tainted_code1: self._logger.info("Recognised Kernel tainted: %s", tainted_msg2) tainted_msg = tainted_msg2 status = TestStatus.KERNEL_TAINTED except KernelPanicError: exec_time = time.time() - start_t self._logger.info("Recognised Kernel panic") status = TestStatus.KERNEL_PANIC except asyncio.TimeoutError: exec_time = time.time() - start_t status = TestStatus.TEST_TIMEOUT self._logger.info("Got test timeout. Checking if SUT is still replying") try: await asyncio.wait_for(channel.ping(), timeout=10) self._logger.info("SUT replied") except asyncio.TimeoutError: status = TestStatus.KERNEL_TIMEOUT # create test results and save it if status not in [TestStatus.OK, TestStatus.KERNEL_TAINTED]: test_data = { "name": test.name, "command": test.full_command, "stdout": iobuffer.stdout, "returncode": -1, "exec_time": exec_time, } # we won't consider tests killed by kirk during forcibly stop, # but only if they have been killed by an external application # or kernel OOM if test_data["returncode"] == -signal.SIGKILL and self._stop_cnt > 1: self._logger.info("Test killed: %s", test.name) return results = await self._framework.read_result( test, test_data["stdout"], test_data["returncode"], test_data["exec_time"], ) self._logger.debug("results=%s", results) self._results.append(results) await libkirk.events.fire("test_completed", results) await self._write_kmsg(test, results) self._logger.info("Test completed: %s", test.name) self._logger.debug(results) # raise kernel errors at the end so we can collect test results if status == TestStatus.KERNEL_TAINTED: await libkirk.events.fire("kernel_tainted", tainted_msg) raise KernelTaintedError() if status == TestStatus.KERNEL_PANIC: await libkirk.events.fire("kernel_panic") raise KernelPanicError() if status == TestStatus.KERNEL_TIMEOUT: await libkirk.events.fire("sut_not_responding") raise KernelTimeoutError() async def _run_and_wait(self, tests: List[Test]) -> None: """ Run tests one after another. """ if not tests: return self._logger.info("Scheduling %d tests on single worker", len(tests)) self._running_tests_sem = asyncio.Semaphore(1) for test in tests: await self._run_test(test) async def _run_parallel(self, tests: List[Test]) -> None: """ Run tests in parallel. """ if not tests: return self._running_tests_sem = asyncio.Semaphore(self._max_workers) coros = [self._run_test(test) for test in tests] self._logger.info( "Scheduling %d tests on %d workers", len(coros), self._max_workers ) await asyncio.gather(*coros)
[docs] async def schedule(self, jobs: List[Any]) -> None: if not jobs: raise ValueError("jobs list is empty") for job in jobs: if not isinstance(job, Test): raise ValueError("jobs must be a list of Test") async with self._schedule_lock: self._logger.info("Check what tests can be run in parallel") self._results.clear() # Cache filtered lists to avoid redundant list comprehensions parallelizable_tests = [test for test in jobs if test.parallelizable] sequential_tests = [test for test in jobs if not test.parallelizable] try: if self._max_workers > 1: await self._run_parallel(parallelizable_tests) await self._run_and_wait(sequential_tests) else: await self._run_and_wait(jobs) except KirkException as err: exc_name = err.__class__.__name__ self._logger.info("%s caught during tests execution", exc_name) async with self._running_tests_sem: pass if self._stop_cnt == 0: self._logger.info("Propagating %s exception", exc_name) raise err
[docs] class SuiteScheduler(Scheduler): """ The Scheduler class implementation for suites execution. This is a special scheduler that schedules suites tests, checking for kernel status and rebooting SUT if we have some issues with it (i.e. kernel panic). """ def __init__( self, sut: SUT, framework: Framework, suite_timeout: float = 0.0, exec_timeout: float = 0.0, max_workers: int = 1, ) -> None: """ :param sut: Object used to communicate with SUT. :type sut: SUT :param framework: Framework handler. :type framework: Framework :param suite_timeout: Timeout before stopping testing suite. :type suite_timeout: float :param exec_timeout: Timeout before stopping single execution. :type exec_timeout: float :param max_workers: Maximum number of workers to schedule jobs. :type max_workers: int """ if not sut: raise ValueError("SUT is an empty object") if not framework: raise ValueError("Framework object is empty") suite_timeout = 0.0 if suite_timeout < 0.0 else suite_timeout exec_timeout = 0.0 if exec_timeout < 0.0 else exec_timeout max_workers = 1 if max_workers < 1 else max_workers self._logger = logging.getLogger("kirk.suite_scheduler") self._sut = sut self._framework = framework self._results = [] self._stop = False self._stopped = False self._schedule_lock = asyncio.Lock() self._reboot_lock = asyncio.Lock() self._sut_rebooted = False self._suite_timeout = suite_timeout self._scheduler = TestScheduler( sut=self._sut, framework=self._framework, timeout=exec_timeout, max_workers=max_workers, ) @property def results(self) -> List[Results]: return self._results @property def stopped(self) -> bool: return self._stopped
[docs] async def stop(self) -> None: self._logger.info("Stopping suites execution") self._stop = True try: await self._scheduler.stop() async with self._schedule_lock: pass finally: self._stop = False self._stopped = True self._logger.info("Suites execution has stopped")
async def _restart_sut(self) -> None: """ Restart the SUT after stopping the tests scheduling. """ async with self._reboot_lock: self._logger.info("Rebooting SUT") await libkirk.events.fire("sut_restart", self._sut.name) iobuffer = RedirectSUTStdout(self._sut) await self._scheduler.stop() await self._sut.restart(iobuffer=iobuffer) self._logger.info("SUT rebooted") async def _run_suite(self, suite: Suite) -> None: """ Run a single testing suite and populate the results array. """ self._logger.info("Running suite %s", suite.name) self._logger.debug(suite) await libkirk.events.fire("suite_started", suite) info = await self._sut.get_info() start_t = 0.0 timed_out = False exec_times = [] tests_results: List[TestResults] = [] tests_left = list(suite.tests) reboot_event = asyncio.Event() try: while not self._stop and tests_left: try: start_t = time.time() await asyncio.wait_for( self._scheduler.schedule(tests_left), timeout=self._suite_timeout, ) except asyncio.TimeoutError: self._logger.info("Testing suite timed out: %s", suite.name) await libkirk.events.fire( "suite_timeout", suite, self._suite_timeout ) timed_out = True except (KernelPanicError, KernelTaintedError, KernelTimeoutError): if self._reboot_lock.locked(): self._logger.info("SUT is rebooting. Waiting...") try: await asyncio.wait_for(reboot_event.wait(), 3600) except asyncio.TimeoutError: self._logger.info("SUT reboot timed out") timed_out = True else: await self._restart_sut() reboot_event.set() finally: exec_times.append(time.time() - start_t) # pyrefly: ignore[bad-argument-type] tests_results.extend(self._scheduler.results) tests_left.clear() completed_test_names = { test_res.test.name for test_res in tests_results } tests_left.extend( [ test for test in suite.tests if test.name not in completed_test_names ] ) if timed_out: for test in tests_left: tests_results.append( TestResults( test=test, failed=0, passed=0, broken=0, skipped=1, warnings=0, exec_time=0.0, retcode=32, stdout="", status=ResultStatus.CONF, ) ) tests_left.clear() break finally: suite_exec_time = self._suite_timeout if exec_times: suite_exec_time = sum(exec_times) suite_results = SuiteResults( suite=suite, tests=tests_results, distro=info["distro"], distro_ver=info["distro_ver"], kernel=info["kernel"], cmdline=info["cmdline"], arch=info["arch"], cpu=info["cpu"], swap=info["swap"], ram=info["ram"], ) await libkirk.events.fire("suite_completed", suite_results, suite_exec_time) self._logger.info("Suite completed") self._logger.debug(suite_results) self._results.append(suite_results)
[docs] async def schedule(self, jobs: List[Any]) -> None: if not jobs: raise ValueError("jobs list is empty") for job in jobs: if not isinstance(job, Suite): raise ValueError("jobs must be a list of Suite") async with self._schedule_lock: self._results.clear() for suite in jobs: await self._run_suite(suite)