Source code for libkirk.monitor

"""
.. module:: monitor
    :platform: Linux
    :synopsis: modules used to generate real-time data from the executor

.. moduleauthor:: Andrea Cervesato <andrea.cervesato@suse.com>
"""

import asyncio
import json
import logging
from typing import (
    Any,
    Dict,
)

import libkirk
from libkirk.data import (
    Suite,
    Test,
)
from libkirk.io import AsyncFile
from libkirk.results import (
    SuiteResults,
    TestResults,
)


[docs] class JSONFileMonitor: """ Monitor the current executor status and it redirects events to a file using JSON format. """ def __init__(self, path: str) -> None: """ :param path: Path of the file. :type path: str """ self._logging = logging.getLogger("libkirk.monitor") self._logging.info("File to monitor: %s", path) self._lock = asyncio.Lock() self._path = path self._events: Dict[str, Any] = { "session_restore": self.session_restore, "session_started": self.session_started, "session_stopped": self.session_stopped, "sut_stdout": self.sut_stdout, "sut_start": self.sut_start, "sut_stop": self.sut_stop, "sut_restart": self.sut_restart, "sut_not_responding": self.sut_not_responding, "run_cmd_start": self.run_cmd_start, "run_cmd_stop": self.run_cmd_stop, "test_stdout": self.test_stdout, "test_started": self.test_started, "test_completed": self.test_completed, "test_timed_out": self.test_timed_out, "suite_started": self.suite_started, "suite_completed": self.suite_completed, "suite_timeout": self.suite_timeout, "session_warning": self.session_warning, "session_error": self.session_error, "kernel_panic": self.kernel_panic, "kernel_tainted": self.kernel_tainted, }
[docs] async def start(self) -> None: """ Attach to events and start writing data inside the monitor file. """ self._logging.info("Start monitoring") for name, coro in self._events.items(): libkirk.events.register(name, coro)
[docs] async def stop(self) -> None: """ Stop monitoring events. """ self._logging.info("Stop monitoring") for name, coro in self._events.items(): libkirk.events.unregister(name, coro)
async def _write(self, msg_type: str, msg: Dict[str, Any]) -> None: """ Write a message to the JSON file. """ data_str = json.dumps({"type": msg_type, "message": msg}) async with self._lock: async with AsyncFile(self._path, "w") as fdata: await fdata.write(data_str) @staticmethod def _test_to_dict(test: Test) -> Dict[str, Any]: """ Convert test into a dict which can be converted to JSON. """ return { "name": test.name, "command": test.command, "arguments": test.arguments, "parallelizable": test.parallelizable, "cwd": test.cwd, "env": test.env, } def _suite_to_dict(self, suite: Suite) -> Dict[str, Any]: """ Translate suite into a dict which can be converted into JSON. """ return { "name": suite.name, "tests": [self._test_to_dict(test) for test in suite.tests], }
[docs] async def session_restore(self, restore: str) -> None: await self._write("session_restore", {"restore": restore})
[docs] async def session_started(self, suites: list, tmpdir: str) -> None: await self._write("session_started", {"tmpdir": tmpdir})
[docs] async def session_stopped(self) -> None: await self._write("session_stopped", {})
[docs] async def sut_stdout(self, sut: str, data: str) -> None: await self._write("sut_stdout", {"sut": sut, "data": data})
[docs] async def sut_start(self, sut: str) -> None: await self._write("sut_start", {"sut": sut})
[docs] async def sut_stop(self, sut: str) -> None: await self._write("sut_stop", {"sut": sut})
[docs] async def sut_restart(self, sut: str) -> None: await self._write("sut_restart", {"sut": sut})
[docs] async def sut_not_responding(self) -> None: await self._write("sut_not_responding", {})
[docs] async def run_cmd_start(self, cmd: str) -> None: await self._write("run_cmd_start", {"cmd": cmd})
[docs] async def run_cmd_stop(self, command: str, stdout: str, returncode: int) -> None: await self._write( "run_cmd_stop", {"command": command, "stdout": stdout, "returncode": returncode}, )
[docs] async def test_stdout(self, test: Test, data: str) -> None: await self._write( "test_stdout", {"test": self._test_to_dict(test), "data": data}, )
[docs] async def test_started(self, test: Test) -> None: await self._write("test_started", {"test": self._test_to_dict(test)})
[docs] async def test_completed(self, results: TestResults) -> None: await self._write( "test_completed", { "test": self._test_to_dict(results.test), "stdout": results.stdout, "status": results.status, "exec_time": results.exec_time, "passed": results.passed, "failed": results.failed, "broken": results.broken, "skipped": results.skipped, "warnings": results.warnings, }, )
[docs] async def test_timed_out(self, test: Test, timeout: int) -> None: await self._write( "test_timed_out", {"test": self._test_to_dict(test), "timeout": timeout} )
[docs] async def suite_started(self, suite: Suite) -> None: await self._write("suite_started", self._suite_to_dict(suite))
[docs] async def suite_completed(self, results: SuiteResults, exec_time: float) -> None: # Optimize: Create dict directly in function call await self._write( "suite_completed", { "suite": self._suite_to_dict(results.suite), "exec_time": exec_time, "total_run": len(results.suite.tests), "passed": results.passed, "failed": results.failed, "skipped": results.skipped, "broken": results.broken, "warnings": results.warnings, "kernel_version": results.kernel, "cmdline": results.cmdline, "cpu": results.cpu, "arch": results.arch, "ram": results.ram, "swap": results.swap, "distro": results.distro, "distro_version": results.distro_ver, }, )
[docs] async def suite_timeout(self, suite: Suite, timeout: float) -> None: await self._write( "suite_timeout", {"suite": self._suite_to_dict(suite), "timeout": timeout}, )
[docs] async def session_warning(self, msg: str) -> None: await self._write("session_warning", {"message": msg})
[docs] async def session_error(self, error: str) -> None: await self._write("session_error", {"error": error})
[docs] async def kernel_panic(self) -> None: await self._write("kernel_panic", {})
[docs] async def kernel_tainted(self, message: str) -> None: await self._write("kernel_tainted", {"message": message})