From 0c092dc5c4ec9501e8ca1496bb8d4b6a08850ac6 Mon Sep 17 00:00:00 2001 From: Sergi Blanch Torne Date: Thu, 23 May 2024 14:03:50 +0200 Subject: [PATCH] ci: run_n_monitor, collect and summarize The job duration is printed during the test progress. It can be collected and summarized at the end of the process. It is interesting when doing a stress test as one will have this information together at the end. Signed-off-by: Sergi Blanch Torne Part-of: --- bin/ci/ci_run_n_monitor.py | 90 +++++++++++++++++++++++++++++--------- 1 file changed, 69 insertions(+), 21 deletions(-) diff --git a/bin/ci/ci_run_n_monitor.py b/bin/ci/ci_run_n_monitor.py index e8dd6c85fac..6c293725ac0 100755 --- a/bin/ci/ci_run_n_monitor.py +++ b/bin/ci/ci_run_n_monitor.py @@ -21,7 +21,7 @@ from concurrent.futures import ThreadPoolExecutor from functools import partial from itertools import chain from subprocess import check_output, CalledProcessError -from typing import TYPE_CHECKING, Iterable, Literal, Optional +from typing import Dict, TYPE_CHECKING, Iterable, Literal, Optional, Tuple import gitlab import gitlab.v4.objects @@ -69,23 +69,31 @@ def print_job_status(job, new_status=False) -> None: if new_status and job.status == "created": return - if job.duration: - duration = job.duration - elif job.started_at: - duration = time.perf_counter() - time.mktime(job.started_at.timetuple()) + duration = job_duration(job) print( STATUS_COLORS[job.status] + "🞋 job " - + URL_START - + f"{job.web_url}\a{job.name}" - + URL_END + + link2print(job.web_url, job.name) + (f" has new status: {job.status}" if new_status else f" :: {job.status}") + (f" ({pretty_duration(duration)})" if job.started_at else "") + Style.RESET_ALL ) +def job_duration(job: gitlab.v4.objects.ProjectPipelineJob) -> float: + """ + Given a job, report the time lapsed in execution. + :param job: Pipeline job + :return: Current time in execution + """ + if job.duration: + return job.duration + elif job.started_at: + return time.perf_counter() - time.mktime(job.started_at.timetuple()) + return 0.0 + + def pretty_wait(sec: int) -> None: """shows progressbar in dots""" for val in range(sec, 0, -1): @@ -100,11 +108,12 @@ def monitor_pipeline( dependencies, force_manual: bool, stress: int, -) -> tuple[Optional[int], Optional[int]]: +) -> tuple[Optional[int], Optional[int], Dict[str, Dict[int, Tuple[float, str, str]]]]: """Monitors pipeline and delegate canceling jobs""" statuses: dict[str, str] = defaultdict(str) target_statuses: dict[str, str] = defaultdict(str) stress_status_counter = defaultdict(lambda: defaultdict(int)) + execution_times = defaultdict(lambda: defaultdict(tuple)) target_id = None # Pre-populate the stress status counter for already completed target jobs. @@ -113,6 +122,7 @@ def monitor_pipeline( for job in pipeline.jobs.list(all=True, include_retried=True): if target_jobs_regex.fullmatch(job.name) and job.status in ["success", "failed"]: stress_status_counter[job.name][job.status] += 1 + execution_times[job.name][job.id] = (job_duration(job), job.status, job.web_url) while True: deps_failed = [] @@ -121,19 +131,22 @@ def monitor_pipeline( # target jobs if target_jobs_regex.fullmatch(job.name): target_id = job.id + target_status = job.status - if stress and job.status in ["success", "failed"]: + if stress and target_status in ["success", "failed"]: if ( stress < 0 or sum(stress_status_counter[job.name].values()) < stress ): - stress_status_counter[job.name][job.status] += 1 + stress_status_counter[job.name][target_status] += 1 + execution_times[job.name][job.id] = (job_duration(job), target_status, job.web_url) job = enable_job(project, pipeline, job, "retry", force_manual) else: + execution_times[job.name][job.id] = (job_duration(job), target_status, job.web_url) job = enable_job(project, pipeline, job, "target", force_manual) - print_job_status(job, job.status not in target_statuses[job.name]) - target_statuses[job.name] = job.status + print_job_status(job, target_status not in target_statuses[job.name]) + target_statuses[job.name] = target_status continue # all other non-target jobs @@ -172,13 +185,13 @@ def monitor_pipeline( if len(target_statuses) == 1 and {"running"}.intersection( target_statuses.values() ): - return target_id, None + return target_id, None, execution_times if ( {"failed"}.intersection(target_statuses.values()) and not set(["running", "pending"]).intersection(target_statuses.values()) ): - return None, 1 + return None, 1, execution_times if ( {"skipped"}.intersection(target_statuses.values()) @@ -190,10 +203,10 @@ def monitor_pipeline( deps_failed, Fore.RESET, ) - return None, 1 + return None, 1, execution_times if {"success", "manual"}.issuperset(target_statuses.values()): - return None, 0 + return None, 0, execution_times pretty_wait(REFRESH_WAIT_JOBS) @@ -410,6 +423,43 @@ def find_dependencies(token: str | None, return target_jobs.union(dependency_jobs) +def print_monitor_summary( + execution_collection: Dict[str, Dict[int, Tuple[float, str, str]]], + t_start: float, +) -> None: + """Summary of the test execution""" + t_end = time.perf_counter() + spend_minutes = (t_end - t_start) / 60 + print(f"⏲ Duration of script execution: {spend_minutes:0.1f} minutes") + if len(execution_collection) == 0: + return + print(f"⏲ Jobs execution times:") + job_names = list(execution_collection.keys()) + job_names.sort() + name_field_pad = len(max(job_names, key=len)) + 2 + for name in job_names: + job_executions = execution_collection[name] + job_times = ', '.join([__job_duration_record(job_execution) + for job_execution in sorted(job_executions.items())]) + print(f"* {name:{name_field_pad}}: ({len(job_executions)}) {job_times}") + + +def __job_duration_record(dict_item: tuple) -> str: + """ + Format each pair of job and its duration. + :param job_execution: item of execution_collection[name][idn]: Dict[int, Tuple[float, str, str]] + """ + job_id = dict_item[0] # dictionary key + job_duration, job_status, job_url = dict_item[1] # dictionary value, the tuple + return (f"{STATUS_COLORS[job_status]}" + f"{link2print(job_url, job_id)}: {pretty_duration(job_duration):>8}" + f"{Style.RESET_ALL}") + + +def link2print(url: str, text: str) -> str: + return f"{URL_START}{url}\a{text}{URL_END}" + + if __name__ == "__main__": try: t_start = time.perf_counter() @@ -486,16 +536,14 @@ if __name__ == "__main__": iid=pipe.iid, project_path=cur_project ) - target_job_id, ret = monitor_pipeline( + target_job_id, ret, exec_t = monitor_pipeline( cur_project, pipe, target_jobs_regex, deps, args.force_manual, args.stress ) if target_job_id: print_log(cur_project, target_job_id) - t_end = time.perf_counter() - spend_minutes = (t_end - t_start) / 60 - print(f"⏲ Duration of script execution: {spend_minutes:0.1f} minutes") + print_monitor_summary(exec_t, t_start) sys.exit(ret) except KeyboardInterrupt: