ci/lava: Refactor LAVAJobSubmitter and add tests
Some refactoring was needed to make LAVAJobSubmitter class testable via pytest. Signed-off-by: Guilherme Gallo <guilherme.gallo@collabora.com> Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/22500>
This commit is contained in:

committed by
Marge Bot

parent
710b568dcd
commit
11a97b644c
@@ -12,9 +12,10 @@ class MesaCITimeoutError(MesaCIException):
|
||||
|
||||
|
||||
class MesaCIRetryError(MesaCIException):
|
||||
def __init__(self, *args, retry_count: int) -> None:
|
||||
def __init__(self, *args, retry_count: int, last_job: None) -> None:
|
||||
super().__init__(*args)
|
||||
self.retry_count = retry_count
|
||||
self.last_job = last_job
|
||||
|
||||
|
||||
class MesaCIParseException(MesaCIException):
|
||||
|
@@ -18,7 +18,7 @@ from collections import defaultdict
|
||||
from dataclasses import dataclass, fields
|
||||
from datetime import datetime, timedelta
|
||||
from io import StringIO
|
||||
from os import getenv
|
||||
from os import environ, getenv, path
|
||||
from typing import Any, Optional
|
||||
|
||||
import fire
|
||||
@@ -282,6 +282,7 @@ def print_job_final_status(job):
|
||||
def execute_job_with_retries(
|
||||
proxy, job_definition, retry_count, jobs_log
|
||||
) -> Optional[LAVAJob]:
|
||||
last_failed_job = None
|
||||
for attempt_no in range(1, retry_count + 2):
|
||||
# Need to get the logger value from its object to enable autosave
|
||||
# features, if AutoSaveDict is enabled from StructuredLogging module
|
||||
@@ -290,41 +291,52 @@ def execute_job_with_retries(
|
||||
job = LAVAJob(proxy, job_definition, job_log)
|
||||
STRUCTURAL_LOG["dut_attempt_counter"] = attempt_no
|
||||
try:
|
||||
job_log["submitter_start_time"] = datetime.now().isoformat()
|
||||
submit_job(job)
|
||||
wait_for_job_get_started(job)
|
||||
log_follower: LogFollower = bootstrap_log_follower()
|
||||
follow_job_execution(job, log_follower)
|
||||
return job
|
||||
|
||||
except (MesaCIException, KeyboardInterrupt) as exception:
|
||||
job.handle_exception(exception)
|
||||
|
||||
finally:
|
||||
print_job_final_status(job)
|
||||
# If LAVA takes too long to post process the job, the submitter
|
||||
# gives up and proceeds.
|
||||
job_log["submitter_end_time"] = datetime.now().isoformat()
|
||||
last_failed_job = job
|
||||
print_log(
|
||||
f"{CONSOLE_LOG['BOLD']}"
|
||||
f"Finished executing LAVA job in the attempt #{attempt_no}"
|
||||
f"{CONSOLE_LOG['RESET']}"
|
||||
)
|
||||
finally:
|
||||
job_log["finished_time"] = datetime.now().isoformat()
|
||||
print_job_final_status(job)
|
||||
|
||||
return last_failed_job
|
||||
|
||||
|
||||
def retriable_follow_job(proxy, job_definition) -> LAVAJob:
|
||||
number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
|
||||
|
||||
if finished_job := execute_job_with_retries(
|
||||
last_attempted_job = execute_job_with_retries(
|
||||
proxy, job_definition, number_of_retries, STRUCTURAL_LOG["dut_jobs"]
|
||||
):
|
||||
return finished_job
|
||||
|
||||
# Job failed in all attempts
|
||||
raise MesaCIRetryError(
|
||||
f"{CONSOLE_LOG['BOLD']}"
|
||||
f"{CONSOLE_LOG['FG_RED']}"
|
||||
"Job failed after it exceeded the number of "
|
||||
f"{number_of_retries} retries."
|
||||
f"{CONSOLE_LOG['RESET']}",
|
||||
retry_count=number_of_retries,
|
||||
)
|
||||
|
||||
if last_attempted_job.exception is not None:
|
||||
# Infra failed in all attempts
|
||||
raise MesaCIRetryError(
|
||||
f"{CONSOLE_LOG['BOLD']}"
|
||||
f"{CONSOLE_LOG['FG_RED']}"
|
||||
"Job failed after it exceeded the number of "
|
||||
f"{number_of_retries} retries."
|
||||
f"{CONSOLE_LOG['RESET']}",
|
||||
retry_count=number_of_retries,
|
||||
last_job=last_attempted_job,
|
||||
)
|
||||
|
||||
return last_attempted_job
|
||||
|
||||
|
||||
@dataclass
|
||||
class PathResolver:
|
||||
@@ -371,20 +383,9 @@ class LAVAJobSubmitter(PathResolver):
|
||||
return
|
||||
|
||||
self.__structured_log_context = StructuredLoggerWrapper(self).logger_context()
|
||||
self.proxy = setup_lava_proxy()
|
||||
|
||||
def dump(self, job_definition):
|
||||
if self.dump_yaml:
|
||||
with GitlabSection(
|
||||
"yaml_dump",
|
||||
"LAVA job definition (YAML)",
|
||||
type=LogSectionType.LAVA_BOOT,
|
||||
start_collapsed=True,
|
||||
):
|
||||
print(hide_sensitive_data(job_definition))
|
||||
|
||||
def submit(self):
|
||||
proxy = setup_lava_proxy()
|
||||
|
||||
def __prepare_submission(self) -> str:
|
||||
# Overwrite the timeout for the testcases with the value offered by the
|
||||
# user. The testcase running time should be at least 4 times greater than
|
||||
# the other sections (boot and setup), so we can safely ignore them.
|
||||
@@ -398,27 +399,81 @@ class LAVAJobSubmitter(PathResolver):
|
||||
lava_yaml.dump(generate_lava_yaml_payload(self), job_definition_stream)
|
||||
job_definition = job_definition_stream.getvalue()
|
||||
|
||||
self.dump(job_definition)
|
||||
if self.dump_yaml:
|
||||
self.dump_job_definition(job_definition)
|
||||
|
||||
job = LAVAJob(proxy, job_definition)
|
||||
if errors := job.validate():
|
||||
validation_job = LAVAJob(self.proxy, job_definition)
|
||||
if errors := validation_job.validate():
|
||||
fatal_err(f"Error in LAVA job definition: {errors}")
|
||||
print_log("LAVA job definition validated successfully")
|
||||
|
||||
return job_definition
|
||||
|
||||
@classmethod
|
||||
def is_under_ci(cls):
|
||||
ci_envvar: str = getenv("CI", "false")
|
||||
return ci_envvar.lower() == "true"
|
||||
|
||||
def dump_job_definition(self, job_definition) -> None:
|
||||
with GitlabSection(
|
||||
"yaml_dump",
|
||||
"LAVA job definition (YAML)",
|
||||
type=LogSectionType.LAVA_BOOT,
|
||||
start_collapsed=True,
|
||||
):
|
||||
print(hide_sensitive_data(job_definition))
|
||||
|
||||
def submit(self) -> None:
|
||||
"""
|
||||
Prepares and submits the LAVA job.
|
||||
If `validate_only` is True, it validates the job without submitting it.
|
||||
If the job finishes with a non-pass status or encounters an exception,
|
||||
the program exits with a non-zero return code.
|
||||
"""
|
||||
job_definition: str = self.__prepare_submission()
|
||||
|
||||
if self.validate_only:
|
||||
return
|
||||
|
||||
with self.__structured_log_context:
|
||||
last_attempt_job = None
|
||||
try:
|
||||
finished_job = retriable_follow_job(proxy, job_definition)
|
||||
exit_code = 0 if finished_job.status == "pass" else 1
|
||||
STRUCTURAL_LOG["job_combined_status"] = job.status
|
||||
sys.exit(exit_code)
|
||||
last_attempt_job = retriable_follow_job(self.proxy, job_definition)
|
||||
|
||||
except MesaCIRetryError as retry_exception:
|
||||
last_attempt_job = retry_exception.last_job
|
||||
|
||||
except Exception as exception:
|
||||
STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception)
|
||||
raise exception
|
||||
|
||||
finally:
|
||||
self.finish_script(last_attempt_job)
|
||||
|
||||
def print_log_artifact_url(self):
|
||||
base_url = "https://$CI_PROJECT_ROOT_NAMESPACE.pages.freedesktop.org/"
|
||||
artifacts_path = "-/$CI_PROJECT_NAME/-/jobs/$CI_JOB_ID/artifacts/"
|
||||
relative_log_path = self.structured_log_file.relative_to(pathlib.Path.cwd())
|
||||
full_path = f"{base_url}{artifacts_path}{relative_log_path}"
|
||||
artifact_url = path.expandvars(full_path)
|
||||
|
||||
print_log(f"Structural Logging data available at: {artifact_url}")
|
||||
|
||||
def finish_script(self, last_attempt_job):
|
||||
if self.is_under_ci() and self.structured_log_file:
|
||||
self.print_log_artifact_url()
|
||||
|
||||
if not last_attempt_job:
|
||||
# No job was run, something bad happened
|
||||
STRUCTURAL_LOG["job_combined_status"] = "script_crash"
|
||||
current_exception = str(sys.exc_info()[0])
|
||||
STRUCTURAL_LOG["job_combined_fail_reason"] = current_exception
|
||||
raise SystemExit(1)
|
||||
|
||||
STRUCTURAL_LOG["job_combined_status"] = last_attempt_job.status
|
||||
|
||||
if last_attempt_job.status != "pass":
|
||||
raise SystemExit(1)
|
||||
|
||||
class StructuredLoggerWrapper:
|
||||
def __init__(self, submitter: LAVAJobSubmitter) -> None:
|
||||
|
@@ -34,6 +34,7 @@ class LAVAJob:
|
||||
self._is_finished = False
|
||||
self.log: dict[str, Any] = log
|
||||
self.status = "not_submitted"
|
||||
self.__exception: Optional[str] = None
|
||||
|
||||
def heartbeat(self) -> None:
|
||||
self.last_log_time: datetime = datetime.now()
|
||||
@@ -61,6 +62,15 @@ class LAVAJob:
|
||||
def is_finished(self) -> bool:
|
||||
return self._is_finished
|
||||
|
||||
@property
|
||||
def exception(self) -> str:
|
||||
return self.__exception
|
||||
|
||||
@exception.setter
|
||||
def exception(self, exception: Exception) -> None:
|
||||
self.__exception = repr(exception)
|
||||
self.log["dut_job_fail_reason"] = self.__exception
|
||||
|
||||
def validate(self) -> Optional[dict]:
|
||||
"""Returns a dict with errors, if the validation fails.
|
||||
|
||||
@@ -158,6 +168,10 @@ class LAVAJob:
|
||||
|
||||
def handle_exception(self, exception: Exception):
|
||||
print_log(exception)
|
||||
self.cancel()
|
||||
self.exception = exception
|
||||
|
||||
# Give more accurate status depending on exception
|
||||
if isinstance(exception, MesaCIKnownIssueException):
|
||||
self.status = "canceled"
|
||||
elif isinstance(exception, MesaCITimeoutError):
|
||||
@@ -165,11 +179,8 @@ class LAVAJob:
|
||||
elif isinstance(exception, MesaCIException):
|
||||
self.status = "failed"
|
||||
elif isinstance(exception, KeyboardInterrupt):
|
||||
self.status = "canceled_by_user"
|
||||
self.status = "interrupted"
|
||||
print_log("LAVA job submitter was interrupted. Cancelling the job.")
|
||||
raise exception
|
||||
raise
|
||||
else:
|
||||
self.status = "job_submitter_error"
|
||||
|
||||
self.cancel()
|
||||
self.log["dut_job_fail_reason"] = str(exception)
|
||||
|
@@ -5,11 +5,13 @@
|
||||
#
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
import os
|
||||
import xmlrpc.client
|
||||
from contextlib import nullcontext as does_not_raise
|
||||
from datetime import datetime
|
||||
from itertools import chain, repeat
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from lava.exceptions import MesaCIException, MesaCIRetryError
|
||||
@@ -50,6 +52,32 @@ def mock_proxy_waiting_time(mock_proxy):
|
||||
return update_mock_proxy
|
||||
|
||||
|
||||
@pytest.fixture(params=[{"CI": "true"}, {"CI": "false"}], ids=["Under CI", "Local run"])
|
||||
def ci_environment(request):
|
||||
with patch.dict(os.environ, request.param):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def lava_job_submitter(
|
||||
ci_environment,
|
||||
tmp_path,
|
||||
mock_proxy,
|
||||
):
|
||||
os.chdir(tmp_path)
|
||||
tmp_file = Path(tmp_path) / "log.json"
|
||||
|
||||
with patch("lava.lava_job_submitter.setup_lava_proxy") as mock_setup_lava_proxy:
|
||||
mock_setup_lava_proxy.return_value = mock_proxy()
|
||||
yield LAVAJobSubmitter(
|
||||
boot_method="test_boot",
|
||||
ci_project_dir="test_dir",
|
||||
device_type="test_device",
|
||||
job_timeout_min=1,
|
||||
structured_log_file=tmp_file,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("exception", [RuntimeError, SystemError, KeyError])
|
||||
def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception):
|
||||
with pytest.raises(MesaCIException):
|
||||
@@ -301,7 +329,7 @@ def test_parse_job_result_from_log(message, expectation, mock_proxy):
|
||||
@pytest.mark.slow(
|
||||
reason="Slow and sketchy test. Needs a LAVA log raw file at /tmp/log.yaml"
|
||||
)
|
||||
def test_full_yaml_log(mock_proxy, frozen_time, tmp_path):
|
||||
def test_full_yaml_log(mock_proxy, frozen_time, lava_job_submitter):
|
||||
import random
|
||||
|
||||
from lavacli.utils import flow_yaml as lava_yaml
|
||||
@@ -352,10 +380,61 @@ def test_full_yaml_log(mock_proxy, frozen_time, tmp_path):
|
||||
def reset_logs(*args):
|
||||
proxy.scheduler.jobs.logs.side_effect = load_lines()
|
||||
|
||||
tmp_file = Path(tmp_path) / "log.json"
|
||||
LAVAJobSubmitter(structured_log_file=tmp_file)
|
||||
proxy.scheduler.jobs.submit = reset_logs
|
||||
with pytest.raises(MesaCIRetryError):
|
||||
time_travel_to_test_time()
|
||||
lava_job_submitter.submit()
|
||||
retriable_follow_job(proxy, "")
|
||||
print(tmp_file.read_text())
|
||||
print(lava_job_submitter.structured_log_file.read_text())
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"validate_only,finished_job_status,expected_combined_status,expected_exit_code",
|
||||
[
|
||||
(True, "pass", None, None),
|
||||
(False, "pass", "pass", 0),
|
||||
(False, "fail", "fail", 1),
|
||||
],
|
||||
ids=[
|
||||
"validate_only_no_job_submission",
|
||||
"successful_job_submission",
|
||||
"failed_job_submission",
|
||||
],
|
||||
)
|
||||
def test_job_combined_status(
|
||||
lava_job_submitter,
|
||||
validate_only,
|
||||
finished_job_status,
|
||||
expected_combined_status,
|
||||
expected_exit_code,
|
||||
):
|
||||
lava_job_submitter.validate_only = validate_only
|
||||
|
||||
with patch(
|
||||
"lava.lava_job_submitter.retriable_follow_job"
|
||||
) as mock_retriable_follow_job, patch(
|
||||
"lava.lava_job_submitter.LAVAJobSubmitter._LAVAJobSubmitter__prepare_submission"
|
||||
) as mock_prepare_submission, patch(
|
||||
"sys.exit"
|
||||
):
|
||||
from lava.lava_job_submitter import STRUCTURAL_LOG
|
||||
|
||||
mock_retriable_follow_job.return_value = MagicMock(status=finished_job_status)
|
||||
|
||||
mock_job_definition = MagicMock(spec=str)
|
||||
mock_prepare_submission.return_value = mock_job_definition
|
||||
original_status: str = STRUCTURAL_LOG.get("job_combined_status")
|
||||
|
||||
if validate_only:
|
||||
lava_job_submitter.submit()
|
||||
mock_retriable_follow_job.assert_not_called()
|
||||
assert STRUCTURAL_LOG.get("job_combined_status") == original_status
|
||||
return
|
||||
|
||||
try:
|
||||
lava_job_submitter.submit()
|
||||
|
||||
except SystemExit as e:
|
||||
assert e.code == expected_exit_code
|
||||
|
||||
assert STRUCTURAL_LOG["job_combined_status"] == expected_combined_status
|
||||
|
Reference in New Issue
Block a user