Source code for libkirk.ltp

"""
.. module:: ltp
    :platform: Linux
    :synopsis: LTP framework definition

.. moduleauthor:: Andrea Cervesato <andrea.cervesato@suse.com>
"""

import json
import logging
import os
import re
from typing import (
    Any,
    Dict,
    List,
    Optional,
)

from libkirk.com import ComChannel
from libkirk.data import (
    Suite,
    Test,
)
from libkirk.errors import FrameworkError
from libkirk.framework import Framework
from libkirk.results import (
    ResultStatus,
    TestResults,
)

# Mapping from LTP return code to ResultStatus
_RETCODE_STATUS: Dict[int, int] = {
    0: ResultStatus.PASS,
    2: ResultStatus.BROK,
    -1: ResultStatus.BROK,
    4: ResultStatus.WARN,
    32: ResultStatus.CONF,
}

_ANSI_ESCAPE = re.compile(r"\u001b\[[0-9;]+[a-zA-Z]")

_SUMMARY_RE = re.compile(
    r"Summary:\n"
    r"passed\s*(?P<passed>\d+)\n"
    r"failed\s*(?P<failed>\d+)\n"
    r"broken\s*(?P<broken>\d+)\n"
    r"skipped\s*(?P<skipped>\d+)\n"
    r"warnings\s*(?P<warnings>\d+)\n",
)


[docs] class LTPFramework(Framework): """ Linux Test Project framework definition. """ # Tags whose presence marks a test as non-parallelizable. PARALLEL_BLACKLIST: frozenset = frozenset( { "needs_root", "needs_device", "mount_device", "mntpoint", "resource_file", "format_device", "save_restore", "max_runtime", } ) # Environment variables without the `LTP_` or `TST_` prefix that are still forwarded. SUPPORTED_ENV: frozenset = frozenset( { "PATH", "KCONFIG_PATH", "KCONFIG_SKIP_CHECK", # LTP network test variables "RHOST", "IPV4_LHOST", "IPV4_RHOST", "IPV6_LHOST", "IPV6_RHOST", "LHOST_IFACES", "RHOST_IFACES", # Stress test parameters "NS_DURATION", "NS_TIMES", "CONNECTION_TOTAL", "IP_TOTAL", "IP_TOTAL_FOR_TCPIP", "ROUTE_TOTAL", "ROUTE_CHANGE_IP", "ROUTE_CHANGE_NETLINK", "MTU_CHANGE_TIMES", "IF_UPDOWN_TIMES", "PING_MAX", # ICMP / multicast "NS_ICMPV4_SENDER_DATA_MAXSIZE", "NS_ICMPV6_SENDER_DATA_MAXSIZE", "MCASTNUM_NORMAL", "MCASTNUM_HEAVY", # File transfer / services "DOWNLOAD_BIGFILESIZE", "DOWNLOAD_REGFILESIZE", "UPLOAD_BIGFILESIZE", "UPLOAD_REGFILESIZE", "HTTP_DOWNLOAD_DIR", "FTP_DOWNLOAD_DIR", "FTP_UPLOAD_DIR", "FTP_UPLOAD_URLDIR", # IPsec / virtual interfaces "IPSEC_MODE", "IPSEC_PROTO", "VIRT_PERF_THRESHOLD", } ) # Variables set explicitly in _update_env_vars; skip them in the loop. _PRESET_ENV: frozenset = frozenset( { "LTPROOT", "TMPDIR", "LTP_COLORIZE_OUTPUT", "LTP_TIMEOUT_MUL", } ) def __init__( self, max_runtime: float = 0.0, timeout: float = 30.0, ) -> None: """ :param max_runtime: filter out all tests above this time value :type max_runtime: float :param timeout: generic tests timeout :type timeout: float """ self._logger = logging.getLogger("libkirk.ltp") self._cmd_matcher = re.compile(r'(?:"[^"]*"|\'[^\']*\'|\S+)') self._max_runtime = max_runtime self._root = os.environ.get("LTPROOT", "/opt/ltp") self._tc_folder = os.path.join(self._root, "testcases", "bin") self._env: Dict[str, str] = {} self._update_env_vars(timeout) def _update_env_vars(self, timeout: float) -> None: """ Populate self._env with LTP-relevant environment variables. """ self._env["LTPROOT"] = self._root self._env["TMPDIR"] = os.environ.get("TMPDIR", "/tmp") self._env["LTP_COLORIZE_OUTPUT"] = os.environ.get("LTP_COLORIZE_OUTPUT", "1") multiplier = os.environ.get("LTP_TIMEOUT_MUL") if multiplier: self._env["LTP_TIMEOUT_MUL"] = multiplier elif timeout: self._env["LTP_TIMEOUT_MUL"] = str((timeout * 0.9) / 300.0) for key, val in os.environ.items(): if key in self._PRESET_ENV: continue if key in self.SUPPORTED_ENV or key.startswith("LTP_") or key.startswith("TST_"): self._env[key] = val async def _read_path(self, channel: ComChannel) -> Dict[str, str]: """ Return a copy of self._env with the testcases folder appended to PATH. """ env = self._env.copy() if "PATH" in env: env["PATH"] = f"{env['PATH']}:{self._tc_folder}" else: ret = await channel.run_command("echo -n $PATH") if not ret or ret["returncode"] != 0: raise FrameworkError("Can't read PATH variable") env["PATH"] = f"{ret['stdout'].strip()}:{self._tc_folder}" self._logger.debug("PATH=%s", env["PATH"]) return env def _is_addable(self, test_params: Dict[str, Any]) -> bool: """ Return False when max_runtime filtering is active and the test exceeds the configured limit. """ if not self._max_runtime: return True runtime = test_params.get("max_runtime") if runtime is None: return True try: if float(runtime) >= self._max_runtime: self._logger.info("max_runtime is bigger than %f", self._max_runtime) return False except TypeError: self._logger.error("metadata contains wrong max_runtime type: %s", runtime) return True def _get_cmd_args(self, line: str) -> List[str]: """ Split a runtest line into a list of command + arguments. Handles quoted arguments, e.g.:: cmd -c "cmd2 -g arg1 -t arg2" """ return self._cmd_matcher.findall(line) async def _read_runtest( self, channel: ComChannel, suite_name: str, content: str, metadata: Optional[dict] = None, ) -> Suite: """ Parse a runtest file and return the corresponding Suite. """ self._logger.info("collecting testing suite: %s", suite_name) metadata_tests: Optional[dict] = None if metadata: self._logger.info("Reading metadata content") metadata_tests = metadata.get("tests") env = await self._read_path(channel) tests: List[Test] = [] for line in content.split("\n"): line = line.strip() if not line or line.startswith("#"): continue self._logger.debug("Test declaration: %s", line) parts = self._get_cmd_args(line) if len(parts) < 2: raise FrameworkError("runtest file is not defining test command") test_name, test_cmd, *test_args = parts parallelizable = False if metadata_tests is not None: test_params = metadata_tests.get(test_name) if test_params is None: # Test not using the new LTP API – parallelism unknown. self._logger.info("No %s test params in metadata", test_name) else: self._logger.info("Found %s test params in metadata", test_name) self._logger.debug("params=%s", test_params) if not self._is_addable(test_params): continue parallelizable = not (self.PARALLEL_BLACKLIST & test_params.keys()) self._logger.info( "Test '%s' is%s parallelizable", test_name, "" if parallelizable else " not", ) test = Test( name=test_name, cmd=test_cmd, args=test_args, cwd=self._tc_folder, env=env, parallelizable=parallelizable, ) tests.append(test) self._logger.debug("test: %s", test) self._logger.debug("Collected tests: %d", len(tests)) suite = Suite(suite_name, tests) self._logger.debug(suite) self._logger.info("Collected testing suite: %s", suite_name) return suite
[docs] async def get_suites(self, channel: ComChannel) -> List[str]: if not channel: raise ValueError("SUT is None") ret = await channel.run_command(f"test -d {self._root}") if not ret or ret["returncode"] != 0: raise FrameworkError(f"LTP folder doesn't exist: {self._root}") runtest_dir = os.path.join(self._root, "runtest") ret = await channel.run_command(f"test -d {runtest_dir}") if not ret or ret["returncode"] != 0: raise FrameworkError(f"'{runtest_dir}' doesn't exist inside SUT") ret = await channel.run_command(f"ls --format=single-column {runtest_dir}") if not ret: raise FrameworkError("Can't communicate with SUT") stdout = ret["stdout"] if ret["returncode"] != 0: raise FrameworkError(f"command failed with: {stdout}") return [line for line in stdout.split("\n") if line]
[docs] async def find_command(self, channel: ComChannel, command: str) -> Test: if not channel: raise ValueError("SUT is None") if not command: raise ValueError("command is empty") cmd_args = self._get_cmd_args(command) cwd = None env = None ret = await channel.run_command(f"test -d {self._tc_folder}") if ret and ret["returncode"] == 0: cwd = self._tc_folder env = await self._read_path(channel) return Test( name=cmd_args[0], cmd=cmd_args[0], args=cmd_args[1:] or None, cwd=cwd, env=env, parallelizable=False, )
[docs] async def find_suite(self, channel: ComChannel, name: str) -> Suite: if not channel: raise ValueError("SUT is None") if not name: raise ValueError("name is empty") ret = await channel.run_command(f"test -d {self._root}") if not ret or ret["returncode"] != 0: raise FrameworkError(f"LTP folder doesn't exist: {self._root}") suite_path = os.path.join(self._root, "runtest", name) ret = await channel.run_command(f"test -f {suite_path}") if not ret or ret["returncode"] != 0: raise FrameworkError(f"'{name}' suite doesn't exist") runtest_str = (await channel.fetch_file(suite_path)).decode( encoding="utf-8", errors="ignore" ) metadata_dict = None metadata_path = os.path.join(self._root, "metadata", "ltp.json") ret = await channel.run_command(f"test -f {metadata_path}") if ret and ret["returncode"] == 0: metadata_dict = json.loads(await channel.fetch_file(metadata_path)) return await self._read_runtest(channel, name, runtest_str, metadata_dict)
[docs] async def read_result( self, test: Test, stdout: str, retcode: int, exec_t: float ) -> TestResults: stdout = _ANSI_ESCAPE.sub("", stdout) match = _SUMMARY_RE.search(stdout) if match: passed = int(match.group("passed")) failed = int(match.group("failed")) broken = int(match.group("broken")) skipped = int(match.group("skipped")) warnings = int(match.group("warnings")) else: passed = stdout.count("TPASS") failed = stdout.count("TFAIL") skipped = stdout.count("TSKIP") broken = stdout.count("TBROK") warnings = stdout.count("TWARN") if not any((passed, failed, skipped, broken, warnings)): # Legacy test: derive counts from the return code alone. if retcode == 0: passed = 1 elif retcode == 4: warnings = 1 elif retcode == 32: skipped = 1 elif retcode != -1: failed = 1 status = _RETCODE_STATUS.get(retcode, ResultStatus.FAIL) if retcode == -1: broken = 1 return TestResults( test=test, failed=failed, passed=passed, broken=broken, skipped=skipped, warnings=warnings, exec_time=exec_t, retcode=retcode, stdout=stdout, status=status, )