ci: run_n_monitor, collect and summarize

The job duration is printed during the test progress. It can be collected and
summarized at the end of the process. It is interesting when doing a stress
test as one will have this information together at the end.

Signed-off-by: Sergi Blanch Torne <sergi.blanch.torne@collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/29419>
This commit is contained in:
Sergi Blanch Torne
2024-05-23 14:03:50 +02:00
committed by Marge Bot
parent 5726ecae3e
commit 0c092dc5c4

View File

@@ -21,7 +21,7 @@ from concurrent.futures import ThreadPoolExecutor
from functools import partial
from itertools import chain
from subprocess import check_output, CalledProcessError
from typing import TYPE_CHECKING, Iterable, Literal, Optional
from typing import Dict, TYPE_CHECKING, Iterable, Literal, Optional, Tuple
import gitlab
import gitlab.v4.objects
@@ -69,23 +69,31 @@ def print_job_status(job, new_status=False) -> None:
if new_status and job.status == "created":
return
if job.duration:
duration = job.duration
elif job.started_at:
duration = time.perf_counter() - time.mktime(job.started_at.timetuple())
duration = job_duration(job)
print(
STATUS_COLORS[job.status]
+ "🞋 job "
+ URL_START
+ f"{job.web_url}\a{job.name}"
+ URL_END
+ link2print(job.web_url, job.name)
+ (f" has new status: {job.status}" if new_status else f" :: {job.status}")
+ (f" ({pretty_duration(duration)})" if job.started_at else "")
+ Style.RESET_ALL
)
def job_duration(job: gitlab.v4.objects.ProjectPipelineJob) -> float:
"""
Given a job, report the time lapsed in execution.
:param job: Pipeline job
:return: Current time in execution
"""
if job.duration:
return job.duration
elif job.started_at:
return time.perf_counter() - time.mktime(job.started_at.timetuple())
return 0.0
def pretty_wait(sec: int) -> None:
"""shows progressbar in dots"""
for val in range(sec, 0, -1):
@@ -100,11 +108,12 @@ def monitor_pipeline(
dependencies,
force_manual: bool,
stress: int,
) -> tuple[Optional[int], Optional[int]]:
) -> tuple[Optional[int], Optional[int], Dict[str, Dict[int, Tuple[float, str, str]]]]:
"""Monitors pipeline and delegate canceling jobs"""
statuses: dict[str, str] = defaultdict(str)
target_statuses: dict[str, str] = defaultdict(str)
stress_status_counter = defaultdict(lambda: defaultdict(int))
execution_times = defaultdict(lambda: defaultdict(tuple))
target_id = None
# Pre-populate the stress status counter for already completed target jobs.
@@ -113,6 +122,7 @@ def monitor_pipeline(
for job in pipeline.jobs.list(all=True, include_retried=True):
if target_jobs_regex.fullmatch(job.name) and job.status in ["success", "failed"]:
stress_status_counter[job.name][job.status] += 1
execution_times[job.name][job.id] = (job_duration(job), job.status, job.web_url)
while True:
deps_failed = []
@@ -121,19 +131,22 @@ def monitor_pipeline(
# target jobs
if target_jobs_regex.fullmatch(job.name):
target_id = job.id
target_status = job.status
if stress and job.status in ["success", "failed"]:
if stress and target_status in ["success", "failed"]:
if (
stress < 0
or sum(stress_status_counter[job.name].values()) < stress
):
stress_status_counter[job.name][job.status] += 1
stress_status_counter[job.name][target_status] += 1
execution_times[job.name][job.id] = (job_duration(job), target_status, job.web_url)
job = enable_job(project, pipeline, job, "retry", force_manual)
else:
execution_times[job.name][job.id] = (job_duration(job), target_status, job.web_url)
job = enable_job(project, pipeline, job, "target", force_manual)
print_job_status(job, job.status not in target_statuses[job.name])
target_statuses[job.name] = job.status
print_job_status(job, target_status not in target_statuses[job.name])
target_statuses[job.name] = target_status
continue
# all other non-target jobs
@@ -172,13 +185,13 @@ def monitor_pipeline(
if len(target_statuses) == 1 and {"running"}.intersection(
target_statuses.values()
):
return target_id, None
return target_id, None, execution_times
if (
{"failed"}.intersection(target_statuses.values())
and not set(["running", "pending"]).intersection(target_statuses.values())
):
return None, 1
return None, 1, execution_times
if (
{"skipped"}.intersection(target_statuses.values())
@@ -190,10 +203,10 @@ def monitor_pipeline(
deps_failed,
Fore.RESET,
)
return None, 1
return None, 1, execution_times
if {"success", "manual"}.issuperset(target_statuses.values()):
return None, 0
return None, 0, execution_times
pretty_wait(REFRESH_WAIT_JOBS)
@@ -410,6 +423,43 @@ def find_dependencies(token: str | None,
return target_jobs.union(dependency_jobs)
def print_monitor_summary(
execution_collection: Dict[str, Dict[int, Tuple[float, str, str]]],
t_start: float,
) -> None:
"""Summary of the test execution"""
t_end = time.perf_counter()
spend_minutes = (t_end - t_start) / 60
print(f"⏲ Duration of script execution: {spend_minutes:0.1f} minutes")
if len(execution_collection) == 0:
return
print(f"⏲ Jobs execution times:")
job_names = list(execution_collection.keys())
job_names.sort()
name_field_pad = len(max(job_names, key=len)) + 2
for name in job_names:
job_executions = execution_collection[name]
job_times = ', '.join([__job_duration_record(job_execution)
for job_execution in sorted(job_executions.items())])
print(f"* {name:{name_field_pad}}: ({len(job_executions)}) {job_times}")
def __job_duration_record(dict_item: tuple) -> str:
"""
Format each pair of job and its duration.
:param job_execution: item of execution_collection[name][idn]: Dict[int, Tuple[float, str, str]]
"""
job_id = dict_item[0] # dictionary key
job_duration, job_status, job_url = dict_item[1] # dictionary value, the tuple
return (f"{STATUS_COLORS[job_status]}"
f"{link2print(job_url, job_id)}: {pretty_duration(job_duration):>8}"
f"{Style.RESET_ALL}")
def link2print(url: str, text: str) -> str:
return f"{URL_START}{url}\a{text}{URL_END}"
if __name__ == "__main__":
try:
t_start = time.perf_counter()
@@ -486,16 +536,14 @@ if __name__ == "__main__":
iid=pipe.iid,
project_path=cur_project
)
target_job_id, ret = monitor_pipeline(
target_job_id, ret, exec_t = monitor_pipeline(
cur_project, pipe, target_jobs_regex, deps, args.force_manual, args.stress
)
if target_job_id:
print_log(cur_project, target_job_id)
t_end = time.perf_counter()
spend_minutes = (t_end - t_start) / 60
print(f"⏲ Duration of script execution: {spend_minutes:0.1f} minutes")
print_monitor_summary(exec_t, t_start)
sys.exit(ret)
except KeyboardInterrupt: