Source code for libkirk.main

"""
.. module:: main
    :platform: Linux
    :synopsis: main script

.. moduleauthor:: Andrea Cervesato <andrea.cervesato@suse.com>
"""

import argparse
import asyncio
import os
import re
from typing import (
    Dict,
    List,
    Optional,
    Union,
)

import libkirk
import libkirk.com
import libkirk.data
import libkirk.plugin
import libkirk.sut
from libkirk import __version__
from libkirk.com import ComChannel
from libkirk.errors import (
    CommunicationError,
    KirkException,
    SUTError,
)
from libkirk.monitor import JSONFileMonitor
from libkirk.session import Session
from libkirk.sut import SUT
from libkirk.tempfile import TempDir
from libkirk.ui import (
    ParallelUserInterface,
    SimpleUserInterface,
    VerboseUserInterface,
)

# Maximum number of COM instances
MAX_COM_INSTANCES = 128

# return codes of the application
RC_OK = 0
RC_ERROR = 1
RC_INTERRUPT = 130


def _from_params_to_config(params: List[str]) -> Dict[str, str]:
    """
    Return a configuration as dictionary according with input parameters
    given to the commandline option.
    """
    config = {}
    for param in params:
        if "=" not in param:
            raise argparse.ArgumentTypeError(
                f"Missing '=' assignment in '{param}' parameter"
            )

        data = param.split("=", 1)
        key = data[0]
        value = data[1]

        if not key:
            raise argparse.ArgumentTypeError(f"Empty key for '{param}' parameter")

        if not value:
            raise argparse.ArgumentTypeError(f"Empty value for '{param}' parameter")

        config[key] = value

    return config


def _dict_config(
    value: str,
) -> Dict[str, str]:
    """
    Generic dictionary option configuration.
    """
    if value == "help":
        return {"help": ""}

    if not value:
        raise argparse.ArgumentTypeError("Parameters list can't be empty")

    params = value.split(":")

    config = _from_params_to_config(params[1:])
    config["name"] = params[0]

    return config


def _com_config(value: str) -> Optional[Dict[str, str]]:
    """
    Return the list of channels configurations.
    """
    plugins = libkirk.com.get_channels()
    config = _dict_config(value)

    if "help" in config:
        return config

    name = config["name"]

    plugin_names = {p.name for p in plugins}
    if name not in plugin_names:
        raise argparse.ArgumentTypeError(
            f"Can't find communication handler with name '{name}'"
        )

    return config


def _print_plugin_help(
    opt_name: str,
    plugins: Union[List[ComChannel], List[SUT]],
) -> None:
    """
    Print the ``plugins`` help for ``opt_name`` option.
    """
    msg = f"{opt_name} option supports the following syntax:\n"
    msg += "\n\t<name>:<param1>=<value1>:<param2>=<value2>:..\n"
    msg += "\nSupported plugins: | "

    for plugin in plugins:
        msg += f"{plugin.name} | "

    msg += "\n"

    for plugin in plugins:
        if not plugin.config_help:
            msg += f"\n{plugin.name} has not configuration\n"
        else:
            msg += f"\n{plugin.name} configuration:\n"
            for opt, desc in plugin.config_help.items():
                msg += f"\t{opt}: {desc}\n"

    print(msg)


def _iterate_config(value: str) -> int:
    """
    Return the iterate value.
    """
    if not value:
        return 1

    try:
        ret = int(value)
    except TypeError as err:
        raise argparse.ArgumentTypeError("Invalid number") from err

    return max(1, ret)


def _time_config(data: str) -> int:
    """
    Return the time in seconds from '30s', '4m', '5h', '20d' format.
    If no suffix is specified, value is considered in seconds.
    """
    indata = data.strip()

    match = re.search(r"^(?P<value>\d+)\s*(?P<suffix>[smhd]?)$", indata)
    if not match:
        raise argparse.ArgumentTypeError(f"Incorrect time format '{indata}'")

    value = int(match.group("value"))
    suffix = match.group("suffix")

    # Optimize: use dict lookup instead of if/elif chain
    multipliers = {
        "": 1,
        "s": 1,
        "m": 60,
        "h": 3600,
        "d": 86400,  # 3600 * 24
    }

    return value * multipliers.get(suffix, 1)


def _finjection_config(value: str) -> int:
    """
    Return probability of fault injection.
    """
    if not value:
        return 0

    try:
        ret = int(value)
    except TypeError as err:
        raise argparse.ArgumentTypeError("Invalid number") from err

    return max(0, min(100, ret))


def _finterval_config(value: str) -> int:
    """
    Return interval of fault injection.
    """
    if not value:
        return 1

    try:
        ret = int(value)
    except TypeError as err:
        raise argparse.ArgumentTypeError("Invalid number") from err

    return max(1, ret)


def _get_skip_tests(skip_tests: str, skip_file: str) -> str:
    """
    Return the skipped tests regexp.
    """
    skip_parts = []

    if skip_file:
        with open(skip_file, "r", encoding="utf-8") as skip_file_data:
            lines = skip_file_data.readlines()

        toskip = [
            line.rstrip()
            for line in lines
            if line.strip() and not re.search(r"^\s*#", line)
        ]
        if toskip:
            skip_parts.append("|".join(toskip))

    if skip_tests:
        skip_parts.append(skip_tests)

    return "|".join(skip_parts)


def _init_channels(
    args: argparse.Namespace, parser: argparse.ArgumentParser, tmpdir: TempDir
) -> None:
    """
    Initialize channels according to configuration.
    """
    for config in args.com:
        if "id" in config:
            plugin = libkirk.com.clone_channel(config["name"], config["id"])
        else:
            name = config["name"]
            plugin = next(
                (c for c in libkirk.com.get_channels() if c.name == name),
                None,
            )

            assert plugin

        com_config = config.copy()
        com_config["tmpdir"] = tmpdir.abspath

        try:
            # pyrefly: ignore[bad-argument-type]
            plugin.setup(**com_config)
        except CommunicationError as err:
            parser.error(str(err))


def _get_sut(
    args: argparse.Namespace, parser: argparse.ArgumentParser, tmpdir: TempDir
) -> SUT:
    """
    Create and return SUT object.
    """
    sut_config = args.sut.copy()
    sut_config["tmpdir"] = tmpdir.abspath

    sut_name = args.sut["name"]
    sut = next((s for s in libkirk.sut.get_suts() if s.name == sut_name), None)
    if not sut:
        parser.error(f"'{sut_name}' SUT is not available")

    # pyrefly: ignore[missing-attribute]
    sut.optimize = args.optimize_sut

    try:
        # pyrefly: ignore[missing-attribute]
        sut.setup(**sut_config)
    except SUTError as err:
        parser.error(str(err))

    # pyrefly: ignore[bad-return]
    return sut


def _start_session(args: argparse.Namespace, parser: argparse.ArgumentParser) -> None:
    """
    Start the LTP session.
    """
    skip_tests = _get_skip_tests(args.skip_tests, args.skip_file)
    if skip_tests:
        try:
            re.compile(skip_tests)
        except re.error:
            parser.error(f"'{skip_tests}' is not a valid regular expression")

    # check if session can be restored
    restore_dir = args.restore
    if restore_dir and os.path.islink(args.restore):
        restore_dir = os.readlink(args.restore)

    if restore_dir and not os.path.isdir(restore_dir):
        parser.error(f"Can't restore '{args.restore}'. Folder doesn't exist")

    if args.tmp_dir == "":
        tmpdir = TempDir(None)
    elif args.tmp_dir:
        tmpdir = TempDir(args.tmp_dir)
    else:
        tmpdir = TempDir("/tmp")

    # initialize channels
    if args.com:
        _init_channels(args, parser, tmpdir)

    # create SUT
    sut = _get_sut(args, parser, tmpdir)

    # start session
    session = Session(
        tmpdir=tmpdir,
        sut=sut,
        exec_timeout=args.exec_timeout,
        suite_timeout=args.suite_timeout,
        workers=args.workers,
        force_parallel=args.force_parallel,
    )

    # initialize monitor file
    monitor = None
    if args.monitor:
        monitor = JSONFileMonitor(args.monitor)

    # initialize user interface
    if args.workers > 1:
        ParallelUserInterface(args.no_colors)
    elif args.verbose:
        VerboseUserInterface(args.no_colors)
    else:
        SimpleUserInterface(args.no_colors)

    # read tests regex filter
    run_pattern = args.run_pattern
    if run_pattern:
        try:
            re.compile(run_pattern)
        except re.error:
            parser.error(f"'{run_pattern}' is not a valid regular expression")

    async def session_run() -> None:
        """
        Run session then stop events handler.
        """
        try:
            if monitor:
                await monitor.start()

            await session.run(
                command=args.run_command,
                suites=args.run_suite,
                pattern=run_pattern,
                report_path=args.json_report,
                restore_path=restore_dir,
                suite_iterate=args.suite_iterate,
                skip_tests=skip_tests,
                randomize=args.randomize,
                runtime=args.runtime,
                fault_prob=args.fault_injection,
                fault_interval=args.fault_interval,
            )
        except asyncio.CancelledError:
            await session.stop()
        finally:
            await libkirk.events.stop()
            if monitor:
                await monitor.stop()

    loop = libkirk.get_event_loop()
    exit_code = RC_OK

    try:
        loop.run_until_complete(
            # pyrefly: ignore[bad-argument-type]
            asyncio.gather(*[libkirk.events.start(), session_run()])
        )
    except KeyboardInterrupt:
        exit_code = RC_INTERRUPT
    except KirkException:
        exit_code = RC_ERROR
    finally:
        try:
            loop.run_until_complete(session.stop())
        except KeyboardInterrupt:
            loop.run_until_complete(session.stop())

        libkirk.cancel_tasks(loop)
        loop.run_until_complete(libkirk.events.stop())

    parser.exit(exit_code)


[docs] def run(cmd_args: Optional[List[str]] = None) -> None: """ Entry point of the application. :param cmd_args: Command line arguments. :type cmd_args: list(str) | None """ currdir = os.path.dirname(os.path.realpath(__file__)) libkirk.com.discover(os.path.join(currdir, "channels")) libkirk.sut.discover(currdir) parser = argparse.ArgumentParser( description="Kirk - All-in-one Linux Testing Framework" ) generic_opts = parser.add_argument_group("General options") generic_opts.add_argument( "--version", "-V", action="version", version=f"%(prog)s, {__version__}" ) generic_opts.add_argument( "--verbose", "-v", action="store_true", help="Verbose mode" ) generic_opts.add_argument( "--no-colors", "-n", action="store_true", help="If defined, no colors are shown" ) generic_opts.add_argument( "--tmp-dir", "-d", type=str, default="/tmp", help="Temporary directory" ) generic_opts.add_argument( "--restore", "-r", type=str, help="Restore a specific session" ) generic_opts.add_argument( "--json-report", "-o", type=str, help="JSON output report" ) generic_opts.add_argument( "--monitor", "-m", type=str, help="Location of the monitor file" ) generic_opts.add_argument( "--plugins", "-P", type=str, help="Location of custom plugins" ) conf_opts = parser.add_argument_group("Configuration options") conf_opts.add_argument( "--com", "-C", type=_com_config, action="append", help="Communication channel parameters. For help please use '--com help'", ) conf_opts.add_argument( "--sut", "-u", default="default", type=lambda x: _dict_config(x), help="System Under Test parameters. For help please use '--sut help'", ) conf_opts.add_argument("--skip-tests", "-s", type=str, help="Skip specific tests") conf_opts.add_argument( "--skip-file", "-S", type=str, help="Skip specific tests using a skip file (newline separated item)", ) exec_opts = parser.add_argument_group("Execution options") exec_opts.add_argument("--run-suite", "-f", nargs="*", help="List of suites to run") exec_opts.add_argument( "--run-pattern", "-p", help="Run all tests matching the regex pattern" ) exec_opts.add_argument("--run-command", "-c", help="Command to run") exec_opts.add_argument( "--suite-timeout", "-T", type=_time_config, default="1h", help="Timeout before stopping the suite (default: 1h)", ) exec_opts.add_argument( "--exec-timeout", "-t", type=_time_config, default="1h", help="Timeout before stopping a single execution (default: 1h)", ) exec_opts.add_argument( "--randomize", "-R", action="store_true", help="Force parallelization execution of all tests", ) exec_opts.add_argument( "--runtime", "-I", type=_time_config, default="0", help="Set for how long we want to run the session in seconds", ) exec_opts.add_argument( "--suite-iterate", "-i", type=_iterate_config, default=1, help="Number of times to repeat testing suites", ) exec_opts.add_argument( "--workers", "-w", type=int, default=1, help="Number of workers to execute tests in parallel", ) exec_opts.add_argument( "--force-parallel", "-W", action="store_true", help="Force parallelization execution of all tests", ) exec_opts.add_argument( "--fault-injection", "-F", type=_finjection_config, default=0, help="Probability of failure (0-100)", ) exec_opts.add_argument( "--fault-interval", type=_finterval_config, default=1, help="Fault injection interval (default: 1)", ) exec_opts.add_argument( "--optimize-sut", "-O", action="store_true", help="Communicate with SUT using commands parallelization (default: false)", ) # output arguments # parse comand line args = parser.parse_args(cmd_args) if args.plugins: if not os.path.isdir(args.plugins): parser.error(f"'{args.plugins}' plugins directory doesn't exist") libkirk.com.discover(args.plugins) libkirk.sut.discover(args.plugins) if args.com and any("help" in obj for obj in args.com): _print_plugin_help("--com", libkirk.com.get_channels()) parser.exit(RC_OK) if args.com and len(args.com) >= MAX_COM_INSTANCES: parser.error(f"Maximum number of communication objects is {MAX_COM_INSTANCES}") if args.sut and "help" in args.sut: _print_plugin_help("--sut", libkirk.sut.get_suts()) parser.exit(RC_OK) if args.json_report and os.path.exists(args.json_report): parser.error(f"JSON report file already exists: {args.json_report}") if args.run_pattern and not args.run_suite: parser.error("--run-pattern must be used with --run-suite") if not args.run_suite and not args.run_command: parser.error("--run-suite/--run-command are required") if args.skip_file and not os.path.isfile(args.skip_file): parser.error(f"'{args.skip_file}' skip file doesn't exist") if args.tmp_dir and not os.path.isdir(args.tmp_dir): parser.error(f"'{args.tmp_dir}' temporary folder doesn't exist") _start_session(args, parser)
if __name__ == "__main__": run()