ci: Add CustomLogger class and CLI tool

This commit introduces the CustomLogger class, which provides methods
for updating, creating, and managing dut jobs and phases in a structured
log in below json format.
{
  "_timestamp": "2023-10-05T06:16:42.603921",
  "dut_job_type": "rpi3",
  "farm": "igalia",
  "dut_jobs": [
    {
      "status": "pass",
      "submitter_start_time": "2023-10-05T06:16:42.745862",
      "dut_start_time": "2023-10-05T06:16:42.819964",
      "dut_submit_time": "2023-10-05T06:16:45.866096",
      "dut_end_time": "2023-10-05T06:24:13.533394",
      "dut_name": "igalia-ci01-rpi3-1gb",
      "dut_state": "finished",
      "dut_job_phases": [
        {
          "name": "boot",
          "start_time": "2023-10-05T06:16:45.865863",
          "end_time": "2023-10-05T06:17:14.801002"
        },
        {
          "name": "test",
          "start_time": "2023-10-05T06:17:14.801009",
          "end_time": "2023-10-05T06:24:13.610296"
        }
      ],
      "submitter_end_time": "2023-10-05T06:24:13.680729"
    }
  ],
  "job_combined_status": "pass",
  "dut_attempt_counter": 1
}

This class uses the existing StructuredLogger module,
which provides a robust and flexible logging utility supporting multiple
formats. It also includes a command-line tool for updating, creating,
and modifying dut jobs and phases through command-line arguments.
This can be used in ci job scripts to update information in structured logs.
Unit tests also have been added for the new class.

Currently, only LAVA jobs create structured log files, and this will be
extended for other jobs using this tool.

Signed-off-by: Vignesh Raman <vignesh.raman@collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/25179>
This commit is contained in:
Vignesh Raman
2023-08-25 15:10:07 +05:30
committed by Marge Bot
parent b5153693cc
commit aa0c4078de
2 changed files with 1003 additions and 0 deletions

334
bin/ci/custom_logger.py Normal file
View File

@@ -0,0 +1,334 @@
import argparse
import logging
from datetime import datetime
from pathlib import Path
from structured_logger import StructuredLogger
class CustomLogger:
def __init__(self, log_file):
self.log_file = log_file
self.logger = StructuredLogger(file_name=self.log_file)
def get_last_dut_job(self):
"""
Gets the details of the most recent DUT job.
Returns:
dict: Details of the most recent DUT job.
Raises:
ValueError: If no DUT jobs are found in the logger's data.
"""
try:
job = self.logger.data["dut_jobs"][-1]
except KeyError:
raise ValueError(
"No DUT jobs found. Please create a job via create_dut_job call."
)
return job
def update(self, **kwargs):
"""
Updates the log file with provided key-value pairs.
Args:
**kwargs: Key-value pairs to be updated.
"""
with self.logger.edit_context():
for key, value in kwargs.items():
self.logger.data[key] = value
def create_dut_job(self, **kwargs):
"""
Creates a new DUT job with provided key-value pairs.
Args:
**kwargs: Key-value pairs for the new DUT job.
"""
with self.logger.edit_context():
if "dut_jobs" not in self.logger.data:
self.logger.data["dut_jobs"] = []
new_job = {
"status": "",
"submitter_start_time": datetime.now().isoformat(),
"dut_submit_time": "",
"dut_start_time": "",
"dut_end_time": "",
"dut_name": "",
"dut_state": "pending",
"dut_job_phases": [],
**kwargs,
}
self.logger.data["dut_jobs"].append(new_job)
def update_dut_job(self, key, value):
"""
Updates the last DUT job with a key-value pair.
Args:
key : The key to be updated.
value: The value to be assigned.
"""
with self.logger.edit_context():
job = self.get_last_dut_job()
job[key] = value
def update_status_fail(self, reason=""):
"""
Sets the status of the last DUT job to 'fail' and logs the failure reason.
Args:
reason (str, optional): The reason for the failure. Defaults to "".
"""
with self.logger.edit_context():
job = self.get_last_dut_job()
job["status"] = "fail"
job["dut_job_fail_reason"] = reason
def create_job_phase(self, phase_name):
"""
Creates a new job phase for the last DUT job.
Args:
phase_name : The name of the new job phase.
"""
with self.logger.edit_context():
job = self.get_last_dut_job()
if job["dut_job_phases"] and job["dut_job_phases"][-1]["end_time"] == "":
# If the last phase exists and its end time is empty, set the end time
job["dut_job_phases"][-1]["end_time"] = datetime.now().isoformat()
# Create a new phase
phase_data = {
"name": phase_name,
"start_time": datetime.now().isoformat(),
"end_time": "",
}
job["dut_job_phases"].append(phase_data)
def check_dut_timings(self, job):
"""
Check the timing sequence of a job to ensure logical consistency.
The function verifies that the job's submission time is not earlier than its start time and that
the job's end time is not earlier than its start time. If either of these conditions is found to be true,
an error is logged for each instance of inconsistency.
Args:
job (dict): A dictionary containing timing information of a job. Expected keys are 'dut_start_time',
'dut_submit_time', and 'dut_end_time'.
Returns:
None: This function does not return a value; it logs errors if timing inconsistencies are detected.
The function checks the following:
- If 'dut_start_time' and 'dut_submit_time' are both present and correctly sequenced.
- If 'dut_start_time' and 'dut_end_time' are both present and correctly sequenced.
"""
# Check if the start time and submit time exist
if job.get("dut_start_time") and job.get("dut_submit_time"):
# If they exist, check if the submission time is before the start time
if job["dut_start_time"] < job["dut_submit_time"]:
logging.error("Job submission is happening before job start.")
# Check if the start time and end time exist
if job.get("dut_start_time") and job.get("dut_end_time"):
# If they exist, check if the end time is after the start time
if job["dut_end_time"] < job["dut_start_time"]:
logging.error("Job ended before it started.")
# Method to update DUT start, submit and end time
def update_dut_time(self, value, custom_time):
"""
Updates DUT start, submit, and end times.
Args:
value : Specifies which DUT time to update. Options: 'start', 'submit', 'end'.
custom_time : Custom time to set. If None, use current time.
Raises:
ValueError: If an invalid argument is provided for value.
"""
with self.logger.edit_context():
job = self.get_last_dut_job()
timestamp = custom_time if custom_time else datetime.now().isoformat()
if value == "start":
job["dut_start_time"] = timestamp
job["dut_state"] = "running"
elif value == "submit":
job["dut_submit_time"] = timestamp
job["dut_state"] = "submitted"
elif value == "end":
job["dut_end_time"] = timestamp
job["dut_state"] = "finished"
else:
raise ValueError(
"Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'."
)
# check the sanity of the partial structured log
self.check_dut_timings(job)
def close_dut_job(self):
"""
Closes the most recent DUT (Device Under Test) job in the logger's data.
The method performs the following operations:
1. Validates if there are any DUT jobs in the logger's data.
2. If the last phase of the most recent DUT job has an empty end time, it sets the end time to the current time.
Raises:
ValueError: If no DUT jobs are found in the logger's data.
"""
with self.logger.edit_context():
job = self.get_last_dut_job()
# Check if the last phase exists and its end time is empty, then set the end time
if job["dut_job_phases"] and job["dut_job_phases"][-1]["end_time"] == "":
job["dut_job_phases"][-1]["end_time"] = datetime.now().isoformat()
def close(self):
"""
Closes the most recent DUT (Device Under Test) job in the logger's data.
The method performs the following operations:
1. Determines the combined status of all DUT jobs.
2. Sets the submitter's end time to the current time.
3. Updates the DUT attempt counter to reflect the total number of DUT jobs.
"""
with self.logger.edit_context():
job_status = []
for job in self.logger.data["dut_jobs"]:
if "status" in job:
job_status.append(job["status"])
if not job_status:
job_combined_status = "null"
else:
# Get job_combined_status
if "pass" in job_status:
job_combined_status = "pass"
else:
job_combined_status = "fail"
self.logger.data["job_combined_status"] = job_combined_status
self.logger.data["dut_attempt_counter"] = len(self.logger.data["dut_jobs"])
job["submitter_end_time"] = datetime.now().isoformat()
def process_args(args):
# Function to process key-value pairs and call corresponding logger methods
def process_key_value_pairs(args_list, action_func):
if not args_list:
raise ValueError(
f"No key-value pairs provided for {action_func.__name__.replace('_', '-')}"
)
if len(args_list) % 2 != 0:
raise ValueError(
f"Incomplete key-value pairs for {action_func.__name__.replace('_', '-')}"
)
kwargs = dict(zip(args_list[::2], args_list[1::2]))
action_func(**kwargs)
# Create a CustomLogger object with the specified log file path
custom_logger = CustomLogger(Path(args.log_file))
if args.update:
process_key_value_pairs(args.update, custom_logger.update)
if args.create_dut_job:
process_key_value_pairs(args.create_dut_job, custom_logger.create_dut_job)
if args.update_dut_job:
key, value = args.update_dut_job
custom_logger.update_dut_job(key, value)
if args.create_job_phase:
custom_logger.create_job_phase(args.create_job_phase)
if args.update_status_fail:
custom_logger.update_status_fail(args.update_status_fail)
if args.update_dut_time:
if len(args.update_dut_time) == 2:
action, custom_time = args.update_dut_time
elif len(args.update_dut_time) == 1:
action, custom_time = args.update_dut_time[0], None
else:
raise ValueError("Invalid number of values for --update-dut-time")
if action in ["start", "end", "submit"]:
custom_logger.update_dut_time(action, custom_time)
else:
raise ValueError(
"Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'."
)
if args.close_dut_job:
custom_logger.close_dut_job()
if args.close:
custom_logger.close()
def main():
parser = argparse.ArgumentParser(description="Custom Logger Command Line Tool")
parser.add_argument("log_file", help="Path to the log file")
parser.add_argument(
"--update",
nargs=argparse.ZERO_OR_MORE,
metavar=("key", "value"),
help="Update a key-value pair e.g., --update key1 value1 key2 value2)",
)
parser.add_argument(
"--create-dut-job",
nargs=argparse.ZERO_OR_MORE,
metavar=("key", "value"),
help="Create a new DUT job with key-value pairs (e.g., --create-dut-job key1 value1 key2 value2)",
)
parser.add_argument(
"--update-dut-job",
nargs=argparse.ZERO_OR_MORE,
metavar=("key", "value"),
help="Update a key-value pair in DUT job",
)
parser.add_argument(
"--create-job-phase",
help="Create a new job phase (e.g., --create-job-phase name)",
)
parser.add_argument(
"--update-status-fail",
help="Update fail as the status and log the failure reason (e.g., --update-status-fail reason)",
)
parser.add_argument(
"--update-dut-time",
nargs=argparse.ZERO_OR_MORE,
metavar=("action", "custom_time"),
help="Update DUT start and end time. Provide action ('start', 'submit', 'end') and custom_time (e.g., '2023-01-01T12:00:00')",
)
parser.add_argument(
"--close-dut-job",
action="store_true",
help="Close the dut job by updating end time of last dut job)",
)
parser.add_argument(
"--close",
action="store_true",
help="Updates combined status, submitter's end time and DUT attempt counter",
)
args = parser.parse_args()
process_args(args)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,669 @@
import logging
import subprocess
from datetime import datetime
import pytest
from custom_logger import CustomLogger
@pytest.fixture
def tmp_log_file(tmp_path):
return tmp_path / "test_log.json"
@pytest.fixture
def custom_logger(tmp_log_file):
return CustomLogger(tmp_log_file)
def run_script_with_args(args):
import custom_logger
script_path = custom_logger.__file__
return subprocess.run(
["python3", str(script_path), *args], capture_output=True, text=True
)
# Test case for missing log file
@pytest.mark.parametrize(
"key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
)
def test_missing_log_file_argument(key, value):
result = run_script_with_args(["--update", "key", "value"])
assert result.returncode != 0
# Parametrize test case for valid update arguments
@pytest.mark.parametrize(
"key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
)
def test_update_argument_valid(custom_logger, tmp_log_file, key, value):
result = run_script_with_args([str(tmp_log_file), "--update", key, value])
assert result.returncode == 0
# Test case for passing only the key without a value
def test_update_argument_key_only(custom_logger, tmp_log_file):
key = "dut_attempt_counter"
result = run_script_with_args([str(tmp_log_file), "--update", key])
assert result.returncode != 0
# Test case for not passing any key-value pair
def test_update_argument_no_values(custom_logger, tmp_log_file):
result = run_script_with_args([str(tmp_log_file), "--update"])
assert result.returncode == 0
# Parametrize test case for valid arguments
@pytest.mark.parametrize(
"key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
)
def test_create_argument_valid(custom_logger, tmp_log_file, key, value):
result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value])
assert result.returncode == 0
# Test case for passing only the key without a value
def test_create_argument_key_only(custom_logger, tmp_log_file):
key = "dut_attempt_counter"
result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key])
assert result.returncode != 0
# Test case for not passing any key-value pair
def test_create_argument_no_values(custom_logger, tmp_log_file):
result = run_script_with_args([str(tmp_log_file), "--create-dut-job"])
assert result.returncode == 0
# Test case for updating a DUT job
@pytest.mark.parametrize(
"key, value", [("status", "hung"), ("dut_state", "Canceling"), ("dut_name", "asus")]
)
def test_update_dut_job(custom_logger, tmp_log_file, key, value):
result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value])
assert result.returncode != 0
result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value])
assert result.returncode == 0
result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value])
assert result.returncode == 0
# Test case for updating last DUT job
def test_update_dut_multiple_job(custom_logger, tmp_log_file):
# Create the first DUT job with the first key
result = run_script_with_args(
[str(tmp_log_file), "--create-dut-job", "status", "hung"]
)
assert result.returncode == 0
# Create the second DUT job with the second key
result = run_script_with_args(
[str(tmp_log_file), "--create-dut-job", "dut_state", "Canceling"]
)
assert result.returncode == 0
result = run_script_with_args(
[str(tmp_log_file), "--update-dut-job", "dut_name", "asus"]
)
assert result.returncode == 0
# Parametrize test case for valid phase arguments
@pytest.mark.parametrize(
"phase_name",
[("Phase1"), ("Phase2"), ("Phase3")],
)
def test_create_job_phase_valid(custom_logger, tmp_log_file, phase_name):
custom_logger.create_dut_job(status="pass")
result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name])
assert result.returncode == 0
# Test case for not passing any arguments for create-job-phase
def test_create_job_phase_no_arguments(custom_logger, tmp_log_file):
custom_logger.create_dut_job(status="pass")
result = run_script_with_args([str(tmp_log_file), "--create-job-phase"])
assert result.returncode != 0
# Test case for trying to create a phase job without an existing DUT job
def test_create_job_phase_no_dut_job(custom_logger, tmp_log_file):
phase_name = "Phase1"
result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name])
assert result.returncode != 0
# Combined test cases for valid scenarios
def test_valid_scenarios(custom_logger, tmp_log_file):
valid_update_args = [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
for key, value in valid_update_args:
result = run_script_with_args([str(tmp_log_file), "--update", key, value])
assert result.returncode == 0
valid_create_args = [
("status", "hung"),
("dut_state", "Canceling"),
("dut_name", "asus"),
("phase_name", "Bootloader"),
]
for key, value in valid_create_args:
result = run_script_with_args(
[str(tmp_log_file), "--create-dut-job", key, value]
)
assert result.returncode == 0
result = run_script_with_args(
[str(tmp_log_file), "--create-dut-job", "status", "hung"]
)
assert result.returncode == 0
result = run_script_with_args(
[str(tmp_log_file), "--update-dut-job", "dut_name", "asus"]
)
assert result.returncode == 0
result = run_script_with_args(
[
str(tmp_log_file),
"--create-job-phase",
"phase_name",
]
)
assert result.returncode == 0
# Parametrize test case for valid update arguments
@pytest.mark.parametrize(
"key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
)
def test_update(custom_logger, key, value):
custom_logger.update(**{key: value})
logger_data = custom_logger.logger.data
assert key in logger_data
assert logger_data[key] == value
# Test case for updating with a key that already exists
def test_update_existing_key(custom_logger):
key = "status"
value = "new_value"
custom_logger.logger.data[key] = "old_value"
custom_logger.update(**{key: value})
logger_data = custom_logger.logger.data
assert key in logger_data
assert logger_data[key] == value
# Test case for updating "dut_jobs"
def test_update_dut_jobs(custom_logger):
key1 = "status"
value1 = "fail"
key2 = "state"
value2 = "hung"
custom_logger.create_dut_job(**{key1: value1})
logger_data = custom_logger.logger.data
job1 = logger_data["dut_jobs"][0]
assert key1 in job1
assert job1[key1] == value1
custom_logger.update_dut_job(key2, value2)
logger_data = custom_logger.logger.data
job2 = logger_data["dut_jobs"][0]
assert key2 in job2
assert job2[key2] == value2
# Test case for creating and updating DUT job
def test_create_dut_job(custom_logger):
key = "status"
value1 = "pass"
value2 = "fail"
value3 = "hung"
reason = "job_combined_status"
result = "Finished"
custom_logger.update(**{reason: result})
logger_data = custom_logger.logger.data
assert reason in logger_data
assert logger_data[reason] == result
# Create the first DUT job
custom_logger.create_dut_job(**{key: value1})
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert isinstance(logger_data["dut_jobs"], list)
assert len(logger_data["dut_jobs"]) == 1
assert isinstance(logger_data["dut_jobs"][0], dict)
# Check the values of the keys in the created first DUT job
job1 = logger_data["dut_jobs"][0]
assert key in job1
assert job1[key] == value1
# Create the second DUT job
custom_logger.create_dut_job(**{key: value2})
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert isinstance(logger_data["dut_jobs"], list)
assert len(logger_data["dut_jobs"]) == 2
assert isinstance(logger_data["dut_jobs"][1], dict)
# Check the values of the keys in the created second DUT job
job2 = logger_data["dut_jobs"][1]
assert key in job2
assert job2[key] == value2
# Update the second DUT job with value3
custom_logger.update_dut_job(key, value3)
logger_data = custom_logger.logger.data
# Check the updated value in the second DUT job
job2 = logger_data["dut_jobs"][1]
assert key in job2
assert job2[key] == value3
# Find the index of the last DUT job
last_job_index = len(logger_data["dut_jobs"]) - 1
# Update the last DUT job
custom_logger.update_dut_job("dut_name", "asus")
logger_data = custom_logger.logger.data
# Check the updated value in the last DUT job
job2 = logger_data["dut_jobs"][last_job_index]
assert "dut_name" in job2
assert job2["dut_name"] == "asus"
# Check that "dut_name" is not present in other DUT jobs
for idx, job in enumerate(logger_data["dut_jobs"]):
if idx != last_job_index:
assert job.get("dut_name") == ""
# Test case for updating with missing "dut_jobs" key
def test_update_dut_job_missing_dut_jobs(custom_logger):
key = "status"
value = "fail"
# Attempt to update a DUT job when "dut_jobs" is missing
with pytest.raises(ValueError, match="No DUT jobs found."):
custom_logger.update_dut_job(key, value)
# Test case for creating a job phase
def test_create_job_phase(custom_logger):
custom_logger.create_dut_job(status="pass")
phase_name = "Phase1"
custom_logger.create_job_phase(phase_name)
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert isinstance(logger_data["dut_jobs"], list)
assert len(logger_data["dut_jobs"]) == 1
job = logger_data["dut_jobs"][0]
assert "dut_job_phases" in job
assert isinstance(job["dut_job_phases"], list)
assert len(job["dut_job_phases"]) == 1
phase = job["dut_job_phases"][0]
assert phase["name"] == phase_name
try:
datetime.fromisoformat(phase["start_time"])
assert True
except ValueError:
assert False
assert phase["end_time"] == ""
# Test case for creating multiple phase jobs
def test_create_multiple_phase_jobs(custom_logger):
custom_logger.create_dut_job(status="pass")
phase_data = [
{
"phase_name": "Phase1",
},
{
"phase_name": "Phase2",
},
{
"phase_name": "Phase3",
},
]
for data in phase_data:
phase_name = data["phase_name"]
custom_logger.create_job_phase(phase_name)
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert isinstance(logger_data["dut_jobs"], list)
assert len(logger_data["dut_jobs"]) == 1
job = logger_data["dut_jobs"][0]
assert "dut_job_phases" in job
assert isinstance(job["dut_job_phases"], list)
assert len(job["dut_job_phases"]) == len(phase_data)
for data in phase_data:
phase_name = data["phase_name"]
phase = job["dut_job_phases"][phase_data.index(data)]
assert phase["name"] == phase_name
try:
datetime.fromisoformat(phase["start_time"])
assert True
except ValueError:
assert False
if phase_data.index(data) != len(phase_data) - 1:
try:
datetime.fromisoformat(phase["end_time"])
assert True
except ValueError:
assert False
# Check if the end_time of the last phase is an empty string
last_phase = job["dut_job_phases"][-1]
assert last_phase["end_time"] == ""
# Test case for creating multiple dut jobs and updating phase job for last dut job
def test_create_two_dut_jobs_and_add_phase(custom_logger):
# Create the first DUT job
custom_logger.create_dut_job(status="pass")
# Create the second DUT job
custom_logger.create_dut_job(status="fail")
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert isinstance(logger_data["dut_jobs"], list)
assert len(logger_data["dut_jobs"]) == 2
first_dut_job = logger_data["dut_jobs"][0]
second_dut_job = logger_data["dut_jobs"][1]
# Add a phase to the second DUT job
custom_logger.create_job_phase("Phase1")
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert isinstance(logger_data["dut_jobs"], list)
assert len(logger_data["dut_jobs"]) == 2
first_dut_job = logger_data["dut_jobs"][0]
second_dut_job = logger_data["dut_jobs"][1]
# Check first DUT job does not have a phase
assert not first_dut_job.get("dut_job_phases")
# Check second DUT job has a phase
assert second_dut_job.get("dut_job_phases")
assert isinstance(second_dut_job["dut_job_phases"], list)
assert len(second_dut_job["dut_job_phases"]) == 1
# Test case for updating DUT start time
def test_update_dut_start_time(custom_logger):
custom_logger.create_dut_job(status="pass")
custom_logger.update_dut_time("start", None)
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
dut_job = logger_data["dut_jobs"][0]
assert "dut_start_time" in dut_job
assert dut_job["dut_start_time"] != ""
try:
datetime.fromisoformat(dut_job["dut_start_time"])
assert True
except ValueError:
assert False
# Test case for updating DUT submit time
def test_update_dut_submit_time(custom_logger):
custom_time = "2023-11-09T02:37:06Z"
custom_logger.create_dut_job(status="pass")
custom_logger.update_dut_time("submit", custom_time)
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
dut_job = logger_data["dut_jobs"][0]
assert "dut_submit_time" in dut_job
try:
datetime.fromisoformat(dut_job["dut_submit_time"])
assert True
except ValueError:
assert False
# Test case for updating DUT end time
def test_update_dut_end_time(custom_logger):
custom_logger.create_dut_job(status="pass")
custom_logger.update_dut_time("end", None)
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
dut_job = logger_data["dut_jobs"][0]
assert "dut_end_time" in dut_job
try:
datetime.fromisoformat(dut_job["dut_end_time"])
assert True
except ValueError:
assert False
# Test case for updating DUT time with invalid value
def test_update_dut_time_invalid_value(custom_logger):
custom_logger.create_dut_job(status="pass")
with pytest.raises(
ValueError,
match="Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'.",
):
custom_logger.update_dut_time("invalid_value", None)
# Test case for close_dut_job
def test_close_dut_job(custom_logger):
custom_logger.create_dut_job(status="pass")
custom_logger.create_job_phase("Phase1")
custom_logger.create_job_phase("Phase2")
custom_logger.close_dut_job()
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
dut_job = logger_data["dut_jobs"][0]
assert "dut_job_phases" in dut_job
dut_job_phases = dut_job["dut_job_phases"]
phase1 = dut_job_phases[0]
assert phase1["name"] == "Phase1"
try:
datetime.fromisoformat(phase1["start_time"])
assert True
except ValueError:
assert False
try:
datetime.fromisoformat(phase1["end_time"])
assert True
except ValueError:
assert False
phase2 = dut_job_phases[1]
assert phase2["name"] == "Phase2"
try:
datetime.fromisoformat(phase2["start_time"])
assert True
except ValueError:
assert False
try:
datetime.fromisoformat(phase2["end_time"])
assert True
except ValueError:
assert False
# Test case for close
def test_close(custom_logger):
custom_logger.create_dut_job(status="pass")
custom_logger.close()
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
assert "dut_attempt_counter" in logger_data
assert logger_data["dut_attempt_counter"] == len(logger_data["dut_jobs"])
assert "job_combined_status" in logger_data
assert logger_data["job_combined_status"] != ""
dut_job = logger_data["dut_jobs"][0]
assert "submitter_end_time" in dut_job
try:
datetime.fromisoformat(dut_job["submitter_end_time"])
assert True
except ValueError:
assert False
# Test case for updating status to fail with a reason
def test_update_status_fail_with_reason(custom_logger):
custom_logger.create_dut_job()
reason = "kernel panic"
custom_logger.update_status_fail(reason)
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
dut_job = logger_data["dut_jobs"][0]
assert "status" in dut_job
assert dut_job["status"] == "fail"
assert "dut_job_fail_reason" in dut_job
assert dut_job["dut_job_fail_reason"] == reason
# Test case for updating status to fail without providing a reason
def test_update_status_fail_without_reason(custom_logger):
custom_logger.create_dut_job()
custom_logger.update_status_fail()
# Check if the status is updated and fail reason is empty
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
dut_job = logger_data["dut_jobs"][0]
assert "status" in dut_job
assert dut_job["status"] == "fail"
assert "dut_job_fail_reason" in dut_job
assert dut_job["dut_job_fail_reason"] == ""
# Test case for check_dut_timings with submission time earlier than start time
def test_check_dut_timings_submission_earlier_than_start(custom_logger, caplog):
custom_logger.create_dut_job()
# Set submission time to be earlier than start time
custom_logger.update_dut_time("start", "2023-01-01T11:00:00")
custom_logger.update_dut_time("submit", "2023-01-01T12:00:00")
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
job = logger_data["dut_jobs"][0]
# Call check_dut_timings
custom_logger.check_dut_timings(job)
# Check if an error message is logged
assert "Job submission is happening before job start." in caplog.text
# Test case for check_dut_timings with end time earlier than start time
def test_check_dut_timings_end_earlier_than_start(custom_logger, caplog):
custom_logger.create_dut_job()
# Set end time to be earlier than start time
custom_logger.update_dut_time("end", "2023-01-01T11:00:00")
custom_logger.update_dut_time("start", "2023-01-01T12:00:00")
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
job = logger_data["dut_jobs"][0]
# Call check_dut_timings
custom_logger.check_dut_timings(job)
# Check if an error message is logged
assert "Job ended before it started." in caplog.text
# Test case for check_dut_timings with valid timing sequence
def test_check_dut_timings_valid_timing_sequence(custom_logger, caplog):
custom_logger.create_dut_job()
# Set valid timing sequence
custom_logger.update_dut_time("submit", "2023-01-01T12:00:00")
custom_logger.update_dut_time("start", "2023-01-01T12:30:00")
custom_logger.update_dut_time("end", "2023-01-01T13:00:00")
logger_data = custom_logger.logger.data
assert "dut_jobs" in logger_data
assert len(logger_data["dut_jobs"]) == 1
job = logger_data["dut_jobs"][0]
# Call check_dut_timings
custom_logger.check_dut_timings(job)
# Check that no error messages are logged
assert "Job submission is happening before job start." not in caplog.text
assert "Job ended before it started." not in caplog.text