"""
.. module:: monitor
:platform: Linux
:synopsis: modules used to generate real-time data from the executor
.. moduleauthor:: Andrea Cervesato <andrea.cervesato@suse.com>
"""
import asyncio
import json
import logging
from typing import (
Any,
Dict,
)
import libkirk
from libkirk.data import (
Suite,
Test,
)
from libkirk.io import AsyncFile
from libkirk.results import (
SuiteResults,
TestResults,
)
[docs]
class JSONFileMonitor:
"""
Monitor the current executor status and it redirects events to a file
using JSON format.
"""
def __init__(self, path: str) -> None:
"""
:param path: Path of the file.
:type path: str
"""
self._logging = logging.getLogger("libkirk.monitor")
self._logging.info("File to monitor: %s", path)
self._lock = asyncio.Lock()
self._path = path
self._events: Dict[str, Any] = {
"session_restore": self.session_restore,
"session_started": self.session_started,
"session_stopped": self.session_stopped,
"sut_stdout": self.sut_stdout,
"sut_start": self.sut_start,
"sut_stop": self.sut_stop,
"sut_restart": self.sut_restart,
"sut_not_responding": self.sut_not_responding,
"run_cmd_start": self.run_cmd_start,
"run_cmd_stop": self.run_cmd_stop,
"test_stdout": self.test_stdout,
"test_started": self.test_started,
"test_completed": self.test_completed,
"test_timed_out": self.test_timed_out,
"suite_started": self.suite_started,
"suite_completed": self.suite_completed,
"suite_timeout": self.suite_timeout,
"session_warning": self.session_warning,
"session_error": self.session_error,
"kernel_panic": self.kernel_panic,
"kernel_tainted": self.kernel_tainted,
}
[docs]
async def start(self) -> None:
"""
Attach to events and start writing data inside the monitor file.
"""
self._logging.info("Start monitoring")
for name, coro in self._events.items():
libkirk.events.register(name, coro)
[docs]
async def stop(self) -> None:
"""
Stop monitoring events.
"""
self._logging.info("Stop monitoring")
for name, coro in self._events.items():
libkirk.events.unregister(name, coro)
async def _write(self, msg_type: str, msg: Dict[str, Any]) -> None:
"""
Write a message to the JSON file.
"""
data_str = json.dumps({"type": msg_type, "message": msg})
async with self._lock:
async with AsyncFile(self._path, "w") as fdata:
await fdata.write(data_str)
@staticmethod
def _test_to_dict(test: Test) -> Dict[str, Any]:
"""
Convert test into a dict which can be converted to JSON.
"""
return {
"name": test.name,
"command": test.command,
"arguments": test.arguments,
"parallelizable": test.parallelizable,
"cwd": test.cwd,
"env": test.env,
}
def _suite_to_dict(self, suite: Suite) -> Dict[str, Any]:
"""
Translate suite into a dict which can be converted into JSON.
"""
return {
"name": suite.name,
"tests": [self._test_to_dict(test) for test in suite.tests],
}
[docs]
async def session_restore(self, restore: str) -> None:
await self._write("session_restore", {"restore": restore})
[docs]
async def session_started(self, suites: list, tmpdir: str) -> None:
await self._write("session_started", {"tmpdir": tmpdir})
[docs]
async def session_stopped(self) -> None:
await self._write("session_stopped", {})
[docs]
async def sut_stdout(self, sut: str, data: str) -> None:
await self._write("sut_stdout", {"sut": sut, "data": data})
[docs]
async def sut_start(self, sut: str) -> None:
await self._write("sut_start", {"sut": sut})
[docs]
async def sut_stop(self, sut: str) -> None:
await self._write("sut_stop", {"sut": sut})
[docs]
async def sut_restart(self, sut: str) -> None:
await self._write("sut_restart", {"sut": sut})
[docs]
async def sut_not_responding(self) -> None:
await self._write("sut_not_responding", {})
[docs]
async def run_cmd_start(self, cmd: str) -> None:
await self._write("run_cmd_start", {"cmd": cmd})
[docs]
async def run_cmd_stop(self, command: str, stdout: str, returncode: int) -> None:
await self._write(
"run_cmd_stop",
{"command": command, "stdout": stdout, "returncode": returncode},
)
[docs]
async def test_stdout(self, test: Test, data: str) -> None:
await self._write(
"test_stdout",
{"test": self._test_to_dict(test), "data": data},
)
[docs]
async def test_started(self, test: Test) -> None:
await self._write("test_started", {"test": self._test_to_dict(test)})
[docs]
async def test_completed(self, results: TestResults) -> None:
await self._write(
"test_completed",
{
"test": self._test_to_dict(results.test),
"stdout": results.stdout,
"status": results.status,
"exec_time": results.exec_time,
"passed": results.passed,
"failed": results.failed,
"broken": results.broken,
"skipped": results.skipped,
"warnings": results.warnings,
},
)
[docs]
async def test_timed_out(self, test: Test, timeout: int) -> None:
await self._write(
"test_timed_out", {"test": self._test_to_dict(test), "timeout": timeout}
)
[docs]
async def suite_started(self, suite: Suite) -> None:
await self._write("suite_started", self._suite_to_dict(suite))
[docs]
async def suite_completed(self, results: SuiteResults, exec_time: float) -> None:
# Optimize: Create dict directly in function call
await self._write(
"suite_completed",
{
"suite": self._suite_to_dict(results.suite),
"exec_time": exec_time,
"total_run": len(results.suite.tests),
"passed": results.passed,
"failed": results.failed,
"skipped": results.skipped,
"broken": results.broken,
"warnings": results.warnings,
"kernel_version": results.kernel,
"cmdline": results.cmdline,
"cpu": results.cpu,
"arch": results.arch,
"ram": results.ram,
"swap": results.swap,
"distro": results.distro,
"distro_version": results.distro_ver,
},
)
[docs]
async def suite_timeout(self, suite: Suite, timeout: float) -> None:
await self._write(
"suite_timeout",
{"suite": self._suite_to_dict(suite), "timeout": timeout},
)
[docs]
async def session_warning(self, msg: str) -> None:
await self._write("session_warning", {"message": msg})
[docs]
async def session_error(self, error: str) -> None:
await self._write("session_error", {"error": error})
[docs]
async def kernel_panic(self) -> None:
await self._write("kernel_panic", {})
[docs]
async def kernel_tainted(self, message: str) -> None:
await self._write("kernel_tainted", {"message": message})