670 lines
20 KiB
Python
670 lines
20 KiB
Python
![]() |
import logging
|
||
|
import subprocess
|
||
|
from datetime import datetime
|
||
|
|
||
|
import pytest
|
||
|
from custom_logger import CustomLogger
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def tmp_log_file(tmp_path):
|
||
|
return tmp_path / "test_log.json"
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def custom_logger(tmp_log_file):
|
||
|
return CustomLogger(tmp_log_file)
|
||
|
|
||
|
|
||
|
def run_script_with_args(args):
|
||
|
import custom_logger
|
||
|
|
||
|
script_path = custom_logger.__file__
|
||
|
return subprocess.run(
|
||
|
["python3", str(script_path), *args], capture_output=True, text=True
|
||
|
)
|
||
|
|
||
|
|
||
|
# Test case for missing log file
|
||
|
@pytest.mark.parametrize(
|
||
|
"key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
|
||
|
)
|
||
|
def test_missing_log_file_argument(key, value):
|
||
|
result = run_script_with_args(["--update", "key", "value"])
|
||
|
assert result.returncode != 0
|
||
|
|
||
|
|
||
|
# Parametrize test case for valid update arguments
|
||
|
@pytest.mark.parametrize(
|
||
|
"key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
|
||
|
)
|
||
|
def test_update_argument_valid(custom_logger, tmp_log_file, key, value):
|
||
|
result = run_script_with_args([str(tmp_log_file), "--update", key, value])
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
|
||
|
# Test case for passing only the key without a value
|
||
|
def test_update_argument_key_only(custom_logger, tmp_log_file):
|
||
|
key = "dut_attempt_counter"
|
||
|
result = run_script_with_args([str(tmp_log_file), "--update", key])
|
||
|
assert result.returncode != 0
|
||
|
|
||
|
|
||
|
# Test case for not passing any key-value pair
|
||
|
def test_update_argument_no_values(custom_logger, tmp_log_file):
|
||
|
result = run_script_with_args([str(tmp_log_file), "--update"])
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
|
||
|
# Parametrize test case for valid arguments
|
||
|
@pytest.mark.parametrize(
|
||
|
"key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
|
||
|
)
|
||
|
def test_create_argument_valid(custom_logger, tmp_log_file, key, value):
|
||
|
result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value])
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
|
||
|
# Test case for passing only the key without a value
|
||
|
def test_create_argument_key_only(custom_logger, tmp_log_file):
|
||
|
key = "dut_attempt_counter"
|
||
|
result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key])
|
||
|
assert result.returncode != 0
|
||
|
|
||
|
|
||
|
# Test case for not passing any key-value pair
|
||
|
def test_create_argument_no_values(custom_logger, tmp_log_file):
|
||
|
result = run_script_with_args([str(tmp_log_file), "--create-dut-job"])
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
|
||
|
# Test case for updating a DUT job
|
||
|
@pytest.mark.parametrize(
|
||
|
"key, value", [("status", "hung"), ("dut_state", "Canceling"), ("dut_name", "asus")]
|
||
|
)
|
||
|
def test_update_dut_job(custom_logger, tmp_log_file, key, value):
|
||
|
result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value])
|
||
|
assert result.returncode != 0
|
||
|
|
||
|
result = run_script_with_args([str(tmp_log_file), "--create-dut-job", key, value])
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
result = run_script_with_args([str(tmp_log_file), "--update-dut-job", key, value])
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
|
||
|
# Test case for updating last DUT job
|
||
|
def test_update_dut_multiple_job(custom_logger, tmp_log_file):
|
||
|
# Create the first DUT job with the first key
|
||
|
result = run_script_with_args(
|
||
|
[str(tmp_log_file), "--create-dut-job", "status", "hung"]
|
||
|
)
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
# Create the second DUT job with the second key
|
||
|
result = run_script_with_args(
|
||
|
[str(tmp_log_file), "--create-dut-job", "dut_state", "Canceling"]
|
||
|
)
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
result = run_script_with_args(
|
||
|
[str(tmp_log_file), "--update-dut-job", "dut_name", "asus"]
|
||
|
)
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
|
||
|
# Parametrize test case for valid phase arguments
|
||
|
@pytest.mark.parametrize(
|
||
|
"phase_name",
|
||
|
[("Phase1"), ("Phase2"), ("Phase3")],
|
||
|
)
|
||
|
def test_create_job_phase_valid(custom_logger, tmp_log_file, phase_name):
|
||
|
custom_logger.create_dut_job(status="pass")
|
||
|
|
||
|
result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name])
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
|
||
|
# Test case for not passing any arguments for create-job-phase
|
||
|
def test_create_job_phase_no_arguments(custom_logger, tmp_log_file):
|
||
|
custom_logger.create_dut_job(status="pass")
|
||
|
|
||
|
result = run_script_with_args([str(tmp_log_file), "--create-job-phase"])
|
||
|
assert result.returncode != 0
|
||
|
|
||
|
|
||
|
# Test case for trying to create a phase job without an existing DUT job
|
||
|
def test_create_job_phase_no_dut_job(custom_logger, tmp_log_file):
|
||
|
phase_name = "Phase1"
|
||
|
|
||
|
result = run_script_with_args([str(tmp_log_file), "--create-job-phase", phase_name])
|
||
|
assert result.returncode != 0
|
||
|
|
||
|
|
||
|
# Combined test cases for valid scenarios
|
||
|
def test_valid_scenarios(custom_logger, tmp_log_file):
|
||
|
valid_update_args = [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
|
||
|
for key, value in valid_update_args:
|
||
|
result = run_script_with_args([str(tmp_log_file), "--update", key, value])
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
valid_create_args = [
|
||
|
("status", "hung"),
|
||
|
("dut_state", "Canceling"),
|
||
|
("dut_name", "asus"),
|
||
|
("phase_name", "Bootloader"),
|
||
|
]
|
||
|
for key, value in valid_create_args:
|
||
|
result = run_script_with_args(
|
||
|
[str(tmp_log_file), "--create-dut-job", key, value]
|
||
|
)
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
result = run_script_with_args(
|
||
|
[str(tmp_log_file), "--create-dut-job", "status", "hung"]
|
||
|
)
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
result = run_script_with_args(
|
||
|
[str(tmp_log_file), "--update-dut-job", "dut_name", "asus"]
|
||
|
)
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
result = run_script_with_args(
|
||
|
[
|
||
|
str(tmp_log_file),
|
||
|
"--create-job-phase",
|
||
|
"phase_name",
|
||
|
]
|
||
|
)
|
||
|
assert result.returncode == 0
|
||
|
|
||
|
|
||
|
# Parametrize test case for valid update arguments
|
||
|
@pytest.mark.parametrize(
|
||
|
"key, value", [("dut_attempt_counter", "1"), ("job_combined_status", "pass")]
|
||
|
)
|
||
|
def test_update(custom_logger, key, value):
|
||
|
custom_logger.update(**{key: value})
|
||
|
logger_data = custom_logger.logger.data
|
||
|
|
||
|
assert key in logger_data
|
||
|
assert logger_data[key] == value
|
||
|
|
||
|
|
||
|
# Test case for updating with a key that already exists
|
||
|
def test_update_existing_key(custom_logger):
|
||
|
key = "status"
|
||
|
value = "new_value"
|
||
|
custom_logger.logger.data[key] = "old_value"
|
||
|
custom_logger.update(**{key: value})
|
||
|
logger_data = custom_logger.logger.data
|
||
|
|
||
|
assert key in logger_data
|
||
|
assert logger_data[key] == value
|
||
|
|
||
|
|
||
|
# Test case for updating "dut_jobs"
|
||
|
def test_update_dut_jobs(custom_logger):
|
||
|
key1 = "status"
|
||
|
value1 = "fail"
|
||
|
key2 = "state"
|
||
|
value2 = "hung"
|
||
|
|
||
|
custom_logger.create_dut_job(**{key1: value1})
|
||
|
logger_data = custom_logger.logger.data
|
||
|
|
||
|
job1 = logger_data["dut_jobs"][0]
|
||
|
assert key1 in job1
|
||
|
assert job1[key1] == value1
|
||
|
|
||
|
custom_logger.update_dut_job(key2, value2)
|
||
|
logger_data = custom_logger.logger.data
|
||
|
|
||
|
job2 = logger_data["dut_jobs"][0]
|
||
|
assert key2 in job2
|
||
|
assert job2[key2] == value2
|
||
|
|
||
|
|
||
|
# Test case for creating and updating DUT job
|
||
|
def test_create_dut_job(custom_logger):
|
||
|
key = "status"
|
||
|
value1 = "pass"
|
||
|
value2 = "fail"
|
||
|
value3 = "hung"
|
||
|
|
||
|
reason = "job_combined_status"
|
||
|
result = "Finished"
|
||
|
|
||
|
custom_logger.update(**{reason: result})
|
||
|
logger_data = custom_logger.logger.data
|
||
|
|
||
|
assert reason in logger_data
|
||
|
assert logger_data[reason] == result
|
||
|
|
||
|
# Create the first DUT job
|
||
|
custom_logger.create_dut_job(**{key: value1})
|
||
|
logger_data = custom_logger.logger.data
|
||
|
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert isinstance(logger_data["dut_jobs"], list)
|
||
|
assert len(logger_data["dut_jobs"]) == 1
|
||
|
assert isinstance(logger_data["dut_jobs"][0], dict)
|
||
|
|
||
|
# Check the values of the keys in the created first DUT job
|
||
|
job1 = logger_data["dut_jobs"][0]
|
||
|
assert key in job1
|
||
|
assert job1[key] == value1
|
||
|
|
||
|
# Create the second DUT job
|
||
|
custom_logger.create_dut_job(**{key: value2})
|
||
|
logger_data = custom_logger.logger.data
|
||
|
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert isinstance(logger_data["dut_jobs"], list)
|
||
|
assert len(logger_data["dut_jobs"]) == 2
|
||
|
assert isinstance(logger_data["dut_jobs"][1], dict)
|
||
|
|
||
|
# Check the values of the keys in the created second DUT job
|
||
|
job2 = logger_data["dut_jobs"][1]
|
||
|
assert key in job2
|
||
|
assert job2[key] == value2
|
||
|
|
||
|
# Update the second DUT job with value3
|
||
|
custom_logger.update_dut_job(key, value3)
|
||
|
logger_data = custom_logger.logger.data
|
||
|
|
||
|
# Check the updated value in the second DUT job
|
||
|
job2 = logger_data["dut_jobs"][1]
|
||
|
assert key in job2
|
||
|
assert job2[key] == value3
|
||
|
|
||
|
# Find the index of the last DUT job
|
||
|
last_job_index = len(logger_data["dut_jobs"]) - 1
|
||
|
|
||
|
# Update the last DUT job
|
||
|
custom_logger.update_dut_job("dut_name", "asus")
|
||
|
logger_data = custom_logger.logger.data
|
||
|
|
||
|
# Check the updated value in the last DUT job
|
||
|
job2 = logger_data["dut_jobs"][last_job_index]
|
||
|
assert "dut_name" in job2
|
||
|
assert job2["dut_name"] == "asus"
|
||
|
|
||
|
# Check that "dut_name" is not present in other DUT jobs
|
||
|
for idx, job in enumerate(logger_data["dut_jobs"]):
|
||
|
if idx != last_job_index:
|
||
|
assert job.get("dut_name") == ""
|
||
|
|
||
|
|
||
|
# Test case for updating with missing "dut_jobs" key
|
||
|
def test_update_dut_job_missing_dut_jobs(custom_logger):
|
||
|
key = "status"
|
||
|
value = "fail"
|
||
|
|
||
|
# Attempt to update a DUT job when "dut_jobs" is missing
|
||
|
with pytest.raises(ValueError, match="No DUT jobs found."):
|
||
|
custom_logger.update_dut_job(key, value)
|
||
|
|
||
|
|
||
|
# Test case for creating a job phase
|
||
|
def test_create_job_phase(custom_logger):
|
||
|
custom_logger.create_dut_job(status="pass")
|
||
|
phase_name = "Phase1"
|
||
|
|
||
|
custom_logger.create_job_phase(phase_name)
|
||
|
logger_data = custom_logger.logger.data
|
||
|
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert isinstance(logger_data["dut_jobs"], list)
|
||
|
assert len(logger_data["dut_jobs"]) == 1
|
||
|
|
||
|
job = logger_data["dut_jobs"][0]
|
||
|
assert "dut_job_phases" in job
|
||
|
assert isinstance(job["dut_job_phases"], list)
|
||
|
assert len(job["dut_job_phases"]) == 1
|
||
|
|
||
|
phase = job["dut_job_phases"][0]
|
||
|
assert phase["name"] == phase_name
|
||
|
try:
|
||
|
datetime.fromisoformat(phase["start_time"])
|
||
|
assert True
|
||
|
except ValueError:
|
||
|
assert False
|
||
|
assert phase["end_time"] == ""
|
||
|
|
||
|
|
||
|
# Test case for creating multiple phase jobs
|
||
|
def test_create_multiple_phase_jobs(custom_logger):
|
||
|
custom_logger.create_dut_job(status="pass")
|
||
|
|
||
|
phase_data = [
|
||
|
{
|
||
|
"phase_name": "Phase1",
|
||
|
},
|
||
|
{
|
||
|
"phase_name": "Phase2",
|
||
|
},
|
||
|
{
|
||
|
"phase_name": "Phase3",
|
||
|
},
|
||
|
]
|
||
|
|
||
|
for data in phase_data:
|
||
|
phase_name = data["phase_name"]
|
||
|
|
||
|
custom_logger.create_job_phase(phase_name)
|
||
|
|
||
|
logger_data = custom_logger.logger.data
|
||
|
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert isinstance(logger_data["dut_jobs"], list)
|
||
|
assert len(logger_data["dut_jobs"]) == 1
|
||
|
|
||
|
job = logger_data["dut_jobs"][0]
|
||
|
assert "dut_job_phases" in job
|
||
|
assert isinstance(job["dut_job_phases"], list)
|
||
|
assert len(job["dut_job_phases"]) == len(phase_data)
|
||
|
|
||
|
for data in phase_data:
|
||
|
phase_name = data["phase_name"]
|
||
|
|
||
|
phase = job["dut_job_phases"][phase_data.index(data)]
|
||
|
|
||
|
assert phase["name"] == phase_name
|
||
|
try:
|
||
|
datetime.fromisoformat(phase["start_time"])
|
||
|
assert True
|
||
|
except ValueError:
|
||
|
assert False
|
||
|
|
||
|
if phase_data.index(data) != len(phase_data) - 1:
|
||
|
try:
|
||
|
datetime.fromisoformat(phase["end_time"])
|
||
|
assert True
|
||
|
except ValueError:
|
||
|
assert False
|
||
|
|
||
|
# Check if the end_time of the last phase is an empty string
|
||
|
last_phase = job["dut_job_phases"][-1]
|
||
|
assert last_phase["end_time"] == ""
|
||
|
|
||
|
|
||
|
# Test case for creating multiple dut jobs and updating phase job for last dut job
|
||
|
def test_create_two_dut_jobs_and_add_phase(custom_logger):
|
||
|
# Create the first DUT job
|
||
|
custom_logger.create_dut_job(status="pass")
|
||
|
|
||
|
# Create the second DUT job
|
||
|
custom_logger.create_dut_job(status="fail")
|
||
|
|
||
|
logger_data = custom_logger.logger.data
|
||
|
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert isinstance(logger_data["dut_jobs"], list)
|
||
|
assert len(logger_data["dut_jobs"]) == 2
|
||
|
|
||
|
first_dut_job = logger_data["dut_jobs"][0]
|
||
|
second_dut_job = logger_data["dut_jobs"][1]
|
||
|
|
||
|
# Add a phase to the second DUT job
|
||
|
custom_logger.create_job_phase("Phase1")
|
||
|
|
||
|
logger_data = custom_logger.logger.data
|
||
|
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert isinstance(logger_data["dut_jobs"], list)
|
||
|
assert len(logger_data["dut_jobs"]) == 2
|
||
|
|
||
|
first_dut_job = logger_data["dut_jobs"][0]
|
||
|
second_dut_job = logger_data["dut_jobs"][1]
|
||
|
|
||
|
# Check first DUT job does not have a phase
|
||
|
assert not first_dut_job.get("dut_job_phases")
|
||
|
|
||
|
# Check second DUT job has a phase
|
||
|
assert second_dut_job.get("dut_job_phases")
|
||
|
assert isinstance(second_dut_job["dut_job_phases"], list)
|
||
|
assert len(second_dut_job["dut_job_phases"]) == 1
|
||
|
|
||
|
|
||
|
# Test case for updating DUT start time
|
||
|
def test_update_dut_start_time(custom_logger):
|
||
|
custom_logger.create_dut_job(status="pass")
|
||
|
custom_logger.update_dut_time("start", None)
|
||
|
|
||
|
logger_data = custom_logger.logger.data
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert len(logger_data["dut_jobs"]) == 1
|
||
|
|
||
|
dut_job = logger_data["dut_jobs"][0]
|
||
|
assert "dut_start_time" in dut_job
|
||
|
assert dut_job["dut_start_time"] != ""
|
||
|
|
||
|
try:
|
||
|
datetime.fromisoformat(dut_job["dut_start_time"])
|
||
|
assert True
|
||
|
except ValueError:
|
||
|
assert False
|
||
|
|
||
|
|
||
|
# Test case for updating DUT submit time
|
||
|
def test_update_dut_submit_time(custom_logger):
|
||
|
custom_time = "2023-11-09T02:37:06Z"
|
||
|
custom_logger.create_dut_job(status="pass")
|
||
|
custom_logger.update_dut_time("submit", custom_time)
|
||
|
|
||
|
logger_data = custom_logger.logger.data
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert len(logger_data["dut_jobs"]) == 1
|
||
|
|
||
|
dut_job = logger_data["dut_jobs"][0]
|
||
|
assert "dut_submit_time" in dut_job
|
||
|
|
||
|
try:
|
||
|
datetime.fromisoformat(dut_job["dut_submit_time"])
|
||
|
assert True
|
||
|
except ValueError:
|
||
|
assert False
|
||
|
|
||
|
|
||
|
# Test case for updating DUT end time
|
||
|
def test_update_dut_end_time(custom_logger):
|
||
|
custom_logger.create_dut_job(status="pass")
|
||
|
custom_logger.update_dut_time("end", None)
|
||
|
|
||
|
logger_data = custom_logger.logger.data
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert len(logger_data["dut_jobs"]) == 1
|
||
|
|
||
|
dut_job = logger_data["dut_jobs"][0]
|
||
|
assert "dut_end_time" in dut_job
|
||
|
|
||
|
try:
|
||
|
datetime.fromisoformat(dut_job["dut_end_time"])
|
||
|
assert True
|
||
|
except ValueError:
|
||
|
assert False
|
||
|
|
||
|
|
||
|
# Test case for updating DUT time with invalid value
|
||
|
def test_update_dut_time_invalid_value(custom_logger):
|
||
|
custom_logger.create_dut_job(status="pass")
|
||
|
with pytest.raises(
|
||
|
ValueError,
|
||
|
match="Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'.",
|
||
|
):
|
||
|
custom_logger.update_dut_time("invalid_value", None)
|
||
|
|
||
|
|
||
|
# Test case for close_dut_job
|
||
|
def test_close_dut_job(custom_logger):
|
||
|
custom_logger.create_dut_job(status="pass")
|
||
|
|
||
|
custom_logger.create_job_phase("Phase1")
|
||
|
custom_logger.create_job_phase("Phase2")
|
||
|
|
||
|
custom_logger.close_dut_job()
|
||
|
|
||
|
logger_data = custom_logger.logger.data
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert len(logger_data["dut_jobs"]) == 1
|
||
|
|
||
|
dut_job = logger_data["dut_jobs"][0]
|
||
|
assert "dut_job_phases" in dut_job
|
||
|
dut_job_phases = dut_job["dut_job_phases"]
|
||
|
|
||
|
phase1 = dut_job_phases[0]
|
||
|
assert phase1["name"] == "Phase1"
|
||
|
|
||
|
try:
|
||
|
datetime.fromisoformat(phase1["start_time"])
|
||
|
assert True
|
||
|
except ValueError:
|
||
|
assert False
|
||
|
|
||
|
try:
|
||
|
datetime.fromisoformat(phase1["end_time"])
|
||
|
assert True
|
||
|
except ValueError:
|
||
|
assert False
|
||
|
|
||
|
phase2 = dut_job_phases[1]
|
||
|
assert phase2["name"] == "Phase2"
|
||
|
|
||
|
try:
|
||
|
datetime.fromisoformat(phase2["start_time"])
|
||
|
assert True
|
||
|
except ValueError:
|
||
|
assert False
|
||
|
|
||
|
try:
|
||
|
datetime.fromisoformat(phase2["end_time"])
|
||
|
assert True
|
||
|
except ValueError:
|
||
|
assert False
|
||
|
|
||
|
|
||
|
# Test case for close
|
||
|
def test_close(custom_logger):
|
||
|
custom_logger.create_dut_job(status="pass")
|
||
|
|
||
|
custom_logger.close()
|
||
|
|
||
|
logger_data = custom_logger.logger.data
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert len(logger_data["dut_jobs"]) == 1
|
||
|
assert "dut_attempt_counter" in logger_data
|
||
|
assert logger_data["dut_attempt_counter"] == len(logger_data["dut_jobs"])
|
||
|
assert "job_combined_status" in logger_data
|
||
|
assert logger_data["job_combined_status"] != ""
|
||
|
|
||
|
dut_job = logger_data["dut_jobs"][0]
|
||
|
assert "submitter_end_time" in dut_job
|
||
|
try:
|
||
|
datetime.fromisoformat(dut_job["submitter_end_time"])
|
||
|
assert True
|
||
|
except ValueError:
|
||
|
assert False
|
||
|
|
||
|
|
||
|
# Test case for updating status to fail with a reason
|
||
|
def test_update_status_fail_with_reason(custom_logger):
|
||
|
custom_logger.create_dut_job()
|
||
|
|
||
|
reason = "kernel panic"
|
||
|
custom_logger.update_status_fail(reason)
|
||
|
|
||
|
logger_data = custom_logger.logger.data
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert len(logger_data["dut_jobs"]) == 1
|
||
|
|
||
|
dut_job = logger_data["dut_jobs"][0]
|
||
|
assert "status" in dut_job
|
||
|
assert dut_job["status"] == "fail"
|
||
|
assert "dut_job_fail_reason" in dut_job
|
||
|
assert dut_job["dut_job_fail_reason"] == reason
|
||
|
|
||
|
|
||
|
# Test case for updating status to fail without providing a reason
|
||
|
def test_update_status_fail_without_reason(custom_logger):
|
||
|
custom_logger.create_dut_job()
|
||
|
|
||
|
custom_logger.update_status_fail()
|
||
|
|
||
|
# Check if the status is updated and fail reason is empty
|
||
|
logger_data = custom_logger.logger.data
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert len(logger_data["dut_jobs"]) == 1
|
||
|
|
||
|
dut_job = logger_data["dut_jobs"][0]
|
||
|
assert "status" in dut_job
|
||
|
assert dut_job["status"] == "fail"
|
||
|
assert "dut_job_fail_reason" in dut_job
|
||
|
assert dut_job["dut_job_fail_reason"] == ""
|
||
|
|
||
|
|
||
|
# Test case for check_dut_timings with submission time earlier than start time
|
||
|
def test_check_dut_timings_submission_earlier_than_start(custom_logger, caplog):
|
||
|
custom_logger.create_dut_job()
|
||
|
|
||
|
# Set submission time to be earlier than start time
|
||
|
custom_logger.update_dut_time("start", "2023-01-01T11:00:00")
|
||
|
custom_logger.update_dut_time("submit", "2023-01-01T12:00:00")
|
||
|
|
||
|
logger_data = custom_logger.logger.data
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert len(logger_data["dut_jobs"]) == 1
|
||
|
|
||
|
job = logger_data["dut_jobs"][0]
|
||
|
|
||
|
# Call check_dut_timings
|
||
|
custom_logger.check_dut_timings(job)
|
||
|
|
||
|
# Check if an error message is logged
|
||
|
assert "Job submission is happening before job start." in caplog.text
|
||
|
|
||
|
|
||
|
# Test case for check_dut_timings with end time earlier than start time
|
||
|
def test_check_dut_timings_end_earlier_than_start(custom_logger, caplog):
|
||
|
custom_logger.create_dut_job()
|
||
|
|
||
|
# Set end time to be earlier than start time
|
||
|
custom_logger.update_dut_time("end", "2023-01-01T11:00:00")
|
||
|
custom_logger.update_dut_time("start", "2023-01-01T12:00:00")
|
||
|
|
||
|
logger_data = custom_logger.logger.data
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert len(logger_data["dut_jobs"]) == 1
|
||
|
|
||
|
job = logger_data["dut_jobs"][0]
|
||
|
|
||
|
# Call check_dut_timings
|
||
|
custom_logger.check_dut_timings(job)
|
||
|
|
||
|
# Check if an error message is logged
|
||
|
assert "Job ended before it started." in caplog.text
|
||
|
|
||
|
|
||
|
# Test case for check_dut_timings with valid timing sequence
|
||
|
def test_check_dut_timings_valid_timing_sequence(custom_logger, caplog):
|
||
|
custom_logger.create_dut_job()
|
||
|
|
||
|
# Set valid timing sequence
|
||
|
custom_logger.update_dut_time("submit", "2023-01-01T12:00:00")
|
||
|
custom_logger.update_dut_time("start", "2023-01-01T12:30:00")
|
||
|
custom_logger.update_dut_time("end", "2023-01-01T13:00:00")
|
||
|
|
||
|
logger_data = custom_logger.logger.data
|
||
|
assert "dut_jobs" in logger_data
|
||
|
assert len(logger_data["dut_jobs"]) == 1
|
||
|
|
||
|
job = logger_data["dut_jobs"][0]
|
||
|
|
||
|
# Call check_dut_timings
|
||
|
custom_logger.check_dut_timings(job)
|
||
|
|
||
|
# Check that no error messages are logged
|
||
|
assert "Job submission is happening before job start." not in caplog.text
|
||
|
assert "Job ended before it started." not in caplog.text
|