diff --git a/bin/ci/custom_logger.py b/bin/ci/custom_logger.py new file mode 100644 index 00000000000..7721be2f66e --- /dev/null +++ b/bin/ci/custom_logger.py @@ -0,0 +1,334 @@ +import argparse +import logging +from datetime import datetime +from pathlib import Path + +from structured_logger import StructuredLogger + + +class CustomLogger: + def __init__(self, log_file): + self.log_file = log_file + self.logger = StructuredLogger(file_name=self.log_file) + + def get_last_dut_job(self): + """ + Gets the details of the most recent DUT job. + + Returns: + dict: Details of the most recent DUT job. + + Raises: + ValueError: If no DUT jobs are found in the logger's data. + """ + try: + job = self.logger.data["dut_jobs"][-1] + except KeyError: + raise ValueError( + "No DUT jobs found. Please create a job via create_dut_job call." + ) + + return job + + def update(self, **kwargs): + """ + Updates the log file with provided key-value pairs. + + Args: + **kwargs: Key-value pairs to be updated. + + """ + with self.logger.edit_context(): + for key, value in kwargs.items(): + self.logger.data[key] = value + + def create_dut_job(self, **kwargs): + """ + Creates a new DUT job with provided key-value pairs. + + Args: + **kwargs: Key-value pairs for the new DUT job. + + """ + with self.logger.edit_context(): + if "dut_jobs" not in self.logger.data: + self.logger.data["dut_jobs"] = [] + new_job = { + "status": "", + "submitter_start_time": datetime.now().isoformat(), + "dut_submit_time": "", + "dut_start_time": "", + "dut_end_time": "", + "dut_name": "", + "dut_state": "pending", + "dut_job_phases": [], + **kwargs, + } + self.logger.data["dut_jobs"].append(new_job) + + def update_dut_job(self, key, value): + """ + Updates the last DUT job with a key-value pair. + + Args: + key : The key to be updated. + value: The value to be assigned. + + """ + with self.logger.edit_context(): + job = self.get_last_dut_job() + job[key] = value + + def update_status_fail(self, reason=""): + """ + Sets the status of the last DUT job to 'fail' and logs the failure reason. + + Args: + reason (str, optional): The reason for the failure. Defaults to "". + + """ + with self.logger.edit_context(): + job = self.get_last_dut_job() + job["status"] = "fail" + job["dut_job_fail_reason"] = reason + + def create_job_phase(self, phase_name): + """ + Creates a new job phase for the last DUT job. + + Args: + phase_name : The name of the new job phase. + + """ + with self.logger.edit_context(): + job = self.get_last_dut_job() + if job["dut_job_phases"] and job["dut_job_phases"][-1]["end_time"] == "": + # If the last phase exists and its end time is empty, set the end time + job["dut_job_phases"][-1]["end_time"] = datetime.now().isoformat() + + # Create a new phase + phase_data = { + "name": phase_name, + "start_time": datetime.now().isoformat(), + "end_time": "", + } + job["dut_job_phases"].append(phase_data) + + def check_dut_timings(self, job): + """ + Check the timing sequence of a job to ensure logical consistency. + + The function verifies that the job's submission time is not earlier than its start time and that + the job's end time is not earlier than its start time. If either of these conditions is found to be true, + an error is logged for each instance of inconsistency. + + Args: + job (dict): A dictionary containing timing information of a job. Expected keys are 'dut_start_time', + 'dut_submit_time', and 'dut_end_time'. + + Returns: + None: This function does not return a value; it logs errors if timing inconsistencies are detected. + + The function checks the following: + - If 'dut_start_time' and 'dut_submit_time' are both present and correctly sequenced. + - If 'dut_start_time' and 'dut_end_time' are both present and correctly sequenced. + """ + + # Check if the start time and submit time exist + if job.get("dut_start_time") and job.get("dut_submit_time"): + # If they exist, check if the submission time is before the start time + if job["dut_start_time"] < job["dut_submit_time"]: + logging.error("Job submission is happening before job start.") + + # Check if the start time and end time exist + if job.get("dut_start_time") and job.get("dut_end_time"): + # If they exist, check if the end time is after the start time + if job["dut_end_time"] < job["dut_start_time"]: + logging.error("Job ended before it started.") + + # Method to update DUT start, submit and end time + def update_dut_time(self, value, custom_time): + """ + Updates DUT start, submit, and end times. + + Args: + value : Specifies which DUT time to update. Options: 'start', 'submit', 'end'. + custom_time : Custom time to set. If None, use current time. + + Raises: + ValueError: If an invalid argument is provided for value. + + """ + with self.logger.edit_context(): + job = self.get_last_dut_job() + timestamp = custom_time if custom_time else datetime.now().isoformat() + if value == "start": + job["dut_start_time"] = timestamp + job["dut_state"] = "running" + elif value == "submit": + job["dut_submit_time"] = timestamp + job["dut_state"] = "submitted" + elif value == "end": + job["dut_end_time"] = timestamp + job["dut_state"] = "finished" + else: + raise ValueError( + "Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'." + ) + # check the sanity of the partial structured log + self.check_dut_timings(job) + + def close_dut_job(self): + """ + Closes the most recent DUT (Device Under Test) job in the logger's data. + + The method performs the following operations: + 1. Validates if there are any DUT jobs in the logger's data. + 2. If the last phase of the most recent DUT job has an empty end time, it sets the end time to the current time. + + Raises: + ValueError: If no DUT jobs are found in the logger's data. + """ + with self.logger.edit_context(): + job = self.get_last_dut_job() + # Check if the last phase exists and its end time is empty, then set the end time + if job["dut_job_phases"] and job["dut_job_phases"][-1]["end_time"] == "": + job["dut_job_phases"][-1]["end_time"] = datetime.now().isoformat() + + def close(self): + """ + Closes the most recent DUT (Device Under Test) job in the logger's data. + + The method performs the following operations: + 1. Determines the combined status of all DUT jobs. + 2. Sets the submitter's end time to the current time. + 3. Updates the DUT attempt counter to reflect the total number of DUT jobs. + + """ + with self.logger.edit_context(): + job_status = [] + for job in self.logger.data["dut_jobs"]: + if "status" in job: + job_status.append(job["status"]) + + if not job_status: + job_combined_status = "null" + else: + # Get job_combined_status + if "pass" in job_status: + job_combined_status = "pass" + else: + job_combined_status = "fail" + + self.logger.data["job_combined_status"] = job_combined_status + self.logger.data["dut_attempt_counter"] = len(self.logger.data["dut_jobs"]) + job["submitter_end_time"] = datetime.now().isoformat() + + +def process_args(args): + # Function to process key-value pairs and call corresponding logger methods + def process_key_value_pairs(args_list, action_func): + if not args_list: + raise ValueError( + f"No key-value pairs provided for {action_func.__name__.replace('_', '-')}" + ) + if len(args_list) % 2 != 0: + raise ValueError( + f"Incomplete key-value pairs for {action_func.__name__.replace('_', '-')}" + ) + kwargs = dict(zip(args_list[::2], args_list[1::2])) + action_func(**kwargs) + + # Create a CustomLogger object with the specified log file path + custom_logger = CustomLogger(Path(args.log_file)) + + if args.update: + process_key_value_pairs(args.update, custom_logger.update) + + if args.create_dut_job: + process_key_value_pairs(args.create_dut_job, custom_logger.create_dut_job) + + if args.update_dut_job: + key, value = args.update_dut_job + custom_logger.update_dut_job(key, value) + + if args.create_job_phase: + custom_logger.create_job_phase(args.create_job_phase) + + if args.update_status_fail: + custom_logger.update_status_fail(args.update_status_fail) + + if args.update_dut_time: + if len(args.update_dut_time) == 2: + action, custom_time = args.update_dut_time + elif len(args.update_dut_time) == 1: + action, custom_time = args.update_dut_time[0], None + else: + raise ValueError("Invalid number of values for --update-dut-time") + + if action in ["start", "end", "submit"]: + custom_logger.update_dut_time(action, custom_time) + else: + raise ValueError( + "Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'." + ) + + if args.close_dut_job: + custom_logger.close_dut_job() + + if args.close: + custom_logger.close() + + +def main(): + parser = argparse.ArgumentParser(description="Custom Logger Command Line Tool") + parser.add_argument("log_file", help="Path to the log file") + parser.add_argument( + "--update", + nargs=argparse.ZERO_OR_MORE, + metavar=("key", "value"), + help="Update a key-value pair e.g., --update key1 value1 key2 value2)", + ) + parser.add_argument( + "--create-dut-job", + nargs=argparse.ZERO_OR_MORE, + metavar=("key", "value"), + help="Create a new DUT job with key-value pairs (e.g., --create-dut-job key1 value1 key2 value2)", + ) + parser.add_argument( + "--update-dut-job", + nargs=argparse.ZERO_OR_MORE, + metavar=("key", "value"), + help="Update a key-value pair in DUT job", + ) + parser.add_argument( + "--create-job-phase", + help="Create a new job phase (e.g., --create-job-phase name)", + ) + parser.add_argument( + "--update-status-fail", + help="Update fail as the status and log the failure reason (e.g., --update-status-fail reason)", + ) + parser.add_argument( + "--update-dut-time", + nargs=argparse.ZERO_OR_MORE, + metavar=("action", "custom_time"), + help="Update DUT start and end time. Provide action ('start', 'submit', 'end') and custom_time (e.g., '2023-01-01T12:00:00')", + ) + parser.add_argument( + "--close-dut-job", + action="store_true", + help="Close the dut job by updating end time of last dut job)", + ) + parser.add_argument( + "--close", + action="store_true", + help="Updates combined status, submitter's end time and DUT attempt counter", + ) + args = parser.parse_args() + + process_args(args) + + +if __name__ == "__main__": + main() diff --git a/bin/ci/test/test_custom_logger.py b/bin/ci/test/test_custom_logger.py new file mode 100644 index 00000000000..98ad9c00494 --- /dev/null +++ b/bin/ci/test/test_custom_logger.py @@ -0,0 +1,669 @@ +import logging +import subprocess +from datetime import datetime + +import pytest +from custom_logger import CustomLogger + + +@pytest.fixture +def tmp_log_file(tmp_path): + return tmp_path / "test_log.json" + + +@pytest.fixture +def custom_logger(tmp_log_file): + return CustomLogger(tmp_log_file) + + +def run_script_with_args(args): + import custom_logger + + script_path = custom_logger.__file__ + return subprocess.run( + ["python3", str(script_path), *args], capture_output=True, text=True + ) + + +# Test case for missing log file +@pytest.mark.parametrize( + "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")] +) +def test_missing_log_file_argument(key, value): + result = run_script_with_args(["--update", "key", "value"]) + assert result.returncode != 0 + + +# Parametrize test case for valid update arguments +@pytest.mark.parametrize( + "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")] +) +def test_update_argument_valid(custom_logger, tmp_log_file, key, value): + result = run_script_with_args([str(tmp_log_file), "--update", key, value]) + assert result.returncode == 0 + + +# Test case for passing only the key without a value +def test_update_argument_key_only(custom_logger, tmp_log_file): + key = "dut_attempt_counter" + result = run_script_with_args([str(tmp_log_file), "--update", key]) + assert result.returncode != 0 + + +# Test case for not passing any key-value pair +def test_update_argument_no_values(custom_logger, tmp_log_file): + result = run_script_with_args([str(tmp_log_file), "--update"]) + assert result.returncode == 0 + + +# Parametrize test case for valid arguments +@pytest.mark.parametrize( + "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")] +) +def test_create_argument_valid(custom_logger, tmp_log_file, key, value): + result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value]) + assert result.returncode == 0 + + +# Test case for passing only the key without a value +def test_create_argument_key_only(custom_logger, tmp_log_file): + key = "dut_attempt_counter" + result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key]) + assert result.returncode != 0 + + +# Test case for not passing any key-value pair +def test_create_argument_no_values(custom_logger, tmp_log_file): + result = run_script_with_args([str(tmp_log_file), "--create-dut-job"]) + assert result.returncode == 0 + + +# Test case for updating a DUT job +@pytest.mark.parametrize( + "key, value", [("status", "hung"), ("dut_state", "Canceling"), ("dut_name", "asus")] +) +def test_update_dut_job(custom_logger, tmp_log_file, key, value): + result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value]) + assert result.returncode != 0 + + result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value]) + assert result.returncode == 0 + + result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value]) + assert result.returncode == 0 + + +# Test case for updating last DUT job +def test_update_dut_multiple_job(custom_logger, tmp_log_file): + # Create the first DUT job with the first key + result = run_script_with_args( + [str(tmp_log_file), "--create-dut-job", "status", "hung"] + ) + assert result.returncode == 0 + + # Create the second DUT job with the second key + result = run_script_with_args( + [str(tmp_log_file), "--create-dut-job", "dut_state", "Canceling"] + ) + assert result.returncode == 0 + + result = run_script_with_args( + [str(tmp_log_file), "--update-dut-job", "dut_name", "asus"] + ) + assert result.returncode == 0 + + +# Parametrize test case for valid phase arguments +@pytest.mark.parametrize( + "phase_name", + [("Phase1"), ("Phase2"), ("Phase3")], +) +def test_create_job_phase_valid(custom_logger, tmp_log_file, phase_name): + custom_logger.create_dut_job(status="pass") + + result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name]) + assert result.returncode == 0 + + +# Test case for not passing any arguments for create-job-phase +def test_create_job_phase_no_arguments(custom_logger, tmp_log_file): + custom_logger.create_dut_job(status="pass") + + result = run_script_with_args([str(tmp_log_file), "--create-job-phase"]) + assert result.returncode != 0 + + +# Test case for trying to create a phase job without an existing DUT job +def test_create_job_phase_no_dut_job(custom_logger, tmp_log_file): + phase_name = "Phase1" + + result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name]) + assert result.returncode != 0 + + +# Combined test cases for valid scenarios +def test_valid_scenarios(custom_logger, tmp_log_file): + valid_update_args = [("dut_attempt_counter", "1"), ("job_combined_status", "pass")] + for key, value in valid_update_args: + result = run_script_with_args([str(tmp_log_file), "--update", key, value]) + assert result.returncode == 0 + + valid_create_args = [ + ("status", "hung"), + ("dut_state", "Canceling"), + ("dut_name", "asus"), + ("phase_name", "Bootloader"), + ] + for key, value in valid_create_args: + result = run_script_with_args( + [str(tmp_log_file), "--create-dut-job", key, value] + ) + assert result.returncode == 0 + + result = run_script_with_args( + [str(tmp_log_file), "--create-dut-job", "status", "hung"] + ) + assert result.returncode == 0 + + result = run_script_with_args( + [str(tmp_log_file), "--update-dut-job", "dut_name", "asus"] + ) + assert result.returncode == 0 + + result = run_script_with_args( + [ + str(tmp_log_file), + "--create-job-phase", + "phase_name", + ] + ) + assert result.returncode == 0 + + +# Parametrize test case for valid update arguments +@pytest.mark.parametrize( + "key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")] +) +def test_update(custom_logger, key, value): + custom_logger.update(**{key: value}) + logger_data = custom_logger.logger.data + + assert key in logger_data + assert logger_data[key] == value + + +# Test case for updating with a key that already exists +def test_update_existing_key(custom_logger): + key = "status" + value = "new_value" + custom_logger.logger.data[key] = "old_value" + custom_logger.update(**{key: value}) + logger_data = custom_logger.logger.data + + assert key in logger_data + assert logger_data[key] == value + + +# Test case for updating "dut_jobs" +def test_update_dut_jobs(custom_logger): + key1 = "status" + value1 = "fail" + key2 = "state" + value2 = "hung" + + custom_logger.create_dut_job(**{key1: value1}) + logger_data = custom_logger.logger.data + + job1 = logger_data["dut_jobs"][0] + assert key1 in job1 + assert job1[key1] == value1 + + custom_logger.update_dut_job(key2, value2) + logger_data = custom_logger.logger.data + + job2 = logger_data["dut_jobs"][0] + assert key2 in job2 + assert job2[key2] == value2 + + +# Test case for creating and updating DUT job +def test_create_dut_job(custom_logger): + key = "status" + value1 = "pass" + value2 = "fail" + value3 = "hung" + + reason = "job_combined_status" + result = "Finished" + + custom_logger.update(**{reason: result}) + logger_data = custom_logger.logger.data + + assert reason in logger_data + assert logger_data[reason] == result + + # Create the first DUT job + custom_logger.create_dut_job(**{key: value1}) + logger_data = custom_logger.logger.data + + assert "dut_jobs" in logger_data + assert isinstance(logger_data["dut_jobs"], list) + assert len(logger_data["dut_jobs"]) == 1 + assert isinstance(logger_data["dut_jobs"][0], dict) + + # Check the values of the keys in the created first DUT job + job1 = logger_data["dut_jobs"][0] + assert key in job1 + assert job1[key] == value1 + + # Create the second DUT job + custom_logger.create_dut_job(**{key: value2}) + logger_data = custom_logger.logger.data + + assert "dut_jobs" in logger_data + assert isinstance(logger_data["dut_jobs"], list) + assert len(logger_data["dut_jobs"]) == 2 + assert isinstance(logger_data["dut_jobs"][1], dict) + + # Check the values of the keys in the created second DUT job + job2 = logger_data["dut_jobs"][1] + assert key in job2 + assert job2[key] == value2 + + # Update the second DUT job with value3 + custom_logger.update_dut_job(key, value3) + logger_data = custom_logger.logger.data + + # Check the updated value in the second DUT job + job2 = logger_data["dut_jobs"][1] + assert key in job2 + assert job2[key] == value3 + + # Find the index of the last DUT job + last_job_index = len(logger_data["dut_jobs"]) - 1 + + # Update the last DUT job + custom_logger.update_dut_job("dut_name", "asus") + logger_data = custom_logger.logger.data + + # Check the updated value in the last DUT job + job2 = logger_data["dut_jobs"][last_job_index] + assert "dut_name" in job2 + assert job2["dut_name"] == "asus" + + # Check that "dut_name" is not present in other DUT jobs + for idx, job in enumerate(logger_data["dut_jobs"]): + if idx != last_job_index: + assert job.get("dut_name") == "" + + +# Test case for updating with missing "dut_jobs" key +def test_update_dut_job_missing_dut_jobs(custom_logger): + key = "status" + value = "fail" + + # Attempt to update a DUT job when "dut_jobs" is missing + with pytest.raises(ValueError, match="No DUT jobs found."): + custom_logger.update_dut_job(key, value) + + +# Test case for creating a job phase +def test_create_job_phase(custom_logger): + custom_logger.create_dut_job(status="pass") + phase_name = "Phase1" + + custom_logger.create_job_phase(phase_name) + logger_data = custom_logger.logger.data + + assert "dut_jobs" in logger_data + assert isinstance(logger_data["dut_jobs"], list) + assert len(logger_data["dut_jobs"]) == 1 + + job = logger_data["dut_jobs"][0] + assert "dut_job_phases" in job + assert isinstance(job["dut_job_phases"], list) + assert len(job["dut_job_phases"]) == 1 + + phase = job["dut_job_phases"][0] + assert phase["name"] == phase_name + try: + datetime.fromisoformat(phase["start_time"]) + assert True + except ValueError: + assert False + assert phase["end_time"] == "" + + +# Test case for creating multiple phase jobs +def test_create_multiple_phase_jobs(custom_logger): + custom_logger.create_dut_job(status="pass") + + phase_data = [ + { + "phase_name": "Phase1", + }, + { + "phase_name": "Phase2", + }, + { + "phase_name": "Phase3", + }, + ] + + for data in phase_data: + phase_name = data["phase_name"] + + custom_logger.create_job_phase(phase_name) + + logger_data = custom_logger.logger.data + + assert "dut_jobs" in logger_data + assert isinstance(logger_data["dut_jobs"], list) + assert len(logger_data["dut_jobs"]) == 1 + + job = logger_data["dut_jobs"][0] + assert "dut_job_phases" in job + assert isinstance(job["dut_job_phases"], list) + assert len(job["dut_job_phases"]) == len(phase_data) + + for data in phase_data: + phase_name = data["phase_name"] + + phase = job["dut_job_phases"][phase_data.index(data)] + + assert phase["name"] == phase_name + try: + datetime.fromisoformat(phase["start_time"]) + assert True + except ValueError: + assert False + + if phase_data.index(data) != len(phase_data) - 1: + try: + datetime.fromisoformat(phase["end_time"]) + assert True + except ValueError: + assert False + + # Check if the end_time of the last phase is an empty string + last_phase = job["dut_job_phases"][-1] + assert last_phase["end_time"] == "" + + +# Test case for creating multiple dut jobs and updating phase job for last dut job +def test_create_two_dut_jobs_and_add_phase(custom_logger): + # Create the first DUT job + custom_logger.create_dut_job(status="pass") + + # Create the second DUT job + custom_logger.create_dut_job(status="fail") + + logger_data = custom_logger.logger.data + + assert "dut_jobs" in logger_data + assert isinstance(logger_data["dut_jobs"], list) + assert len(logger_data["dut_jobs"]) == 2 + + first_dut_job = logger_data["dut_jobs"][0] + second_dut_job = logger_data["dut_jobs"][1] + + # Add a phase to the second DUT job + custom_logger.create_job_phase("Phase1") + + logger_data = custom_logger.logger.data + + assert "dut_jobs" in logger_data + assert isinstance(logger_data["dut_jobs"], list) + assert len(logger_data["dut_jobs"]) == 2 + + first_dut_job = logger_data["dut_jobs"][0] + second_dut_job = logger_data["dut_jobs"][1] + + # Check first DUT job does not have a phase + assert not first_dut_job.get("dut_job_phases") + + # Check second DUT job has a phase + assert second_dut_job.get("dut_job_phases") + assert isinstance(second_dut_job["dut_job_phases"], list) + assert len(second_dut_job["dut_job_phases"]) == 1 + + +# Test case for updating DUT start time +def test_update_dut_start_time(custom_logger): + custom_logger.create_dut_job(status="pass") + custom_logger.update_dut_time("start", None) + + logger_data = custom_logger.logger.data + assert "dut_jobs" in logger_data + assert len(logger_data["dut_jobs"]) == 1 + + dut_job = logger_data["dut_jobs"][0] + assert "dut_start_time" in dut_job + assert dut_job["dut_start_time"] != "" + + try: + datetime.fromisoformat(dut_job["dut_start_time"]) + assert True + except ValueError: + assert False + + +# Test case for updating DUT submit time +def test_update_dut_submit_time(custom_logger): + custom_time = "2023-11-09T02:37:06Z" + custom_logger.create_dut_job(status="pass") + custom_logger.update_dut_time("submit", custom_time) + + logger_data = custom_logger.logger.data + assert "dut_jobs" in logger_data + assert len(logger_data["dut_jobs"]) == 1 + + dut_job = logger_data["dut_jobs"][0] + assert "dut_submit_time" in dut_job + + try: + datetime.fromisoformat(dut_job["dut_submit_time"]) + assert True + except ValueError: + assert False + + +# Test case for updating DUT end time +def test_update_dut_end_time(custom_logger): + custom_logger.create_dut_job(status="pass") + custom_logger.update_dut_time("end", None) + + logger_data = custom_logger.logger.data + assert "dut_jobs" in logger_data + assert len(logger_data["dut_jobs"]) == 1 + + dut_job = logger_data["dut_jobs"][0] + assert "dut_end_time" in dut_job + + try: + datetime.fromisoformat(dut_job["dut_end_time"]) + assert True + except ValueError: + assert False + + +# Test case for updating DUT time with invalid value +def test_update_dut_time_invalid_value(custom_logger): + custom_logger.create_dut_job(status="pass") + with pytest.raises( + ValueError, + match="Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'.", + ): + custom_logger.update_dut_time("invalid_value", None) + + +# Test case for close_dut_job +def test_close_dut_job(custom_logger): + custom_logger.create_dut_job(status="pass") + + custom_logger.create_job_phase("Phase1") + custom_logger.create_job_phase("Phase2") + + custom_logger.close_dut_job() + + logger_data = custom_logger.logger.data + assert "dut_jobs" in logger_data + assert len(logger_data["dut_jobs"]) == 1 + + dut_job = logger_data["dut_jobs"][0] + assert "dut_job_phases" in dut_job + dut_job_phases = dut_job["dut_job_phases"] + + phase1 = dut_job_phases[0] + assert phase1["name"] == "Phase1" + + try: + datetime.fromisoformat(phase1["start_time"]) + assert True + except ValueError: + assert False + + try: + datetime.fromisoformat(phase1["end_time"]) + assert True + except ValueError: + assert False + + phase2 = dut_job_phases[1] + assert phase2["name"] == "Phase2" + + try: + datetime.fromisoformat(phase2["start_time"]) + assert True + except ValueError: + assert False + + try: + datetime.fromisoformat(phase2["end_time"]) + assert True + except ValueError: + assert False + + +# Test case for close +def test_close(custom_logger): + custom_logger.create_dut_job(status="pass") + + custom_logger.close() + + logger_data = custom_logger.logger.data + assert "dut_jobs" in logger_data + assert len(logger_data["dut_jobs"]) == 1 + assert "dut_attempt_counter" in logger_data + assert logger_data["dut_attempt_counter"] == len(logger_data["dut_jobs"]) + assert "job_combined_status" in logger_data + assert logger_data["job_combined_status"] != "" + + dut_job = logger_data["dut_jobs"][0] + assert "submitter_end_time" in dut_job + try: + datetime.fromisoformat(dut_job["submitter_end_time"]) + assert True + except ValueError: + assert False + + +# Test case for updating status to fail with a reason +def test_update_status_fail_with_reason(custom_logger): + custom_logger.create_dut_job() + + reason = "kernel panic" + custom_logger.update_status_fail(reason) + + logger_data = custom_logger.logger.data + assert "dut_jobs" in logger_data + assert len(logger_data["dut_jobs"]) == 1 + + dut_job = logger_data["dut_jobs"][0] + assert "status" in dut_job + assert dut_job["status"] == "fail" + assert "dut_job_fail_reason" in dut_job + assert dut_job["dut_job_fail_reason"] == reason + + +# Test case for updating status to fail without providing a reason +def test_update_status_fail_without_reason(custom_logger): + custom_logger.create_dut_job() + + custom_logger.update_status_fail() + + # Check if the status is updated and fail reason is empty + logger_data = custom_logger.logger.data + assert "dut_jobs" in logger_data + assert len(logger_data["dut_jobs"]) == 1 + + dut_job = logger_data["dut_jobs"][0] + assert "status" in dut_job + assert dut_job["status"] == "fail" + assert "dut_job_fail_reason" in dut_job + assert dut_job["dut_job_fail_reason"] == "" + + +# Test case for check_dut_timings with submission time earlier than start time +def test_check_dut_timings_submission_earlier_than_start(custom_logger, caplog): + custom_logger.create_dut_job() + + # Set submission time to be earlier than start time + custom_logger.update_dut_time("start", "2023-01-01T11:00:00") + custom_logger.update_dut_time("submit", "2023-01-01T12:00:00") + + logger_data = custom_logger.logger.data + assert "dut_jobs" in logger_data + assert len(logger_data["dut_jobs"]) == 1 + + job = logger_data["dut_jobs"][0] + + # Call check_dut_timings + custom_logger.check_dut_timings(job) + + # Check if an error message is logged + assert "Job submission is happening before job start." in caplog.text + + +# Test case for check_dut_timings with end time earlier than start time +def test_check_dut_timings_end_earlier_than_start(custom_logger, caplog): + custom_logger.create_dut_job() + + # Set end time to be earlier than start time + custom_logger.update_dut_time("end", "2023-01-01T11:00:00") + custom_logger.update_dut_time("start", "2023-01-01T12:00:00") + + logger_data = custom_logger.logger.data + assert "dut_jobs" in logger_data + assert len(logger_data["dut_jobs"]) == 1 + + job = logger_data["dut_jobs"][0] + + # Call check_dut_timings + custom_logger.check_dut_timings(job) + + # Check if an error message is logged + assert "Job ended before it started." in caplog.text + + +# Test case for check_dut_timings with valid timing sequence +def test_check_dut_timings_valid_timing_sequence(custom_logger, caplog): + custom_logger.create_dut_job() + + # Set valid timing sequence + custom_logger.update_dut_time("submit", "2023-01-01T12:00:00") + custom_logger.update_dut_time("start", "2023-01-01T12:30:00") + custom_logger.update_dut_time("end", "2023-01-01T13:00:00") + + logger_data = custom_logger.logger.data + assert "dut_jobs" in logger_data + assert len(logger_data["dut_jobs"]) == 1 + + job = logger_data["dut_jobs"][0] + + # Call check_dut_timings + custom_logger.check_dut_timings(job) + + # Check that no error messages are logged + assert "Job submission is happening before job start." not in caplog.text + assert "Job ended before it started." not in caplog.text