#!/usr/bin/env python3 # SPDX-License-Identifier: MIT # Provide a markdown-formatted message summarizing the reasons why a pipeline failed. # Marge bot can use this script to provide more helpful comments when CI fails. # Example for running locally: # ./bin/ci/pipeline_message.sh --project-id 176 --pipeline-id 1310098 import argparse import asyncio import logging from typing import Any import aiohttp PER_PAGE: int = 6000 async def get_pipeline_status( session: aiohttp.ClientSession, project_id: str, pipeline_id: str ): url = f"https://gitlab.freedesktop.org/api/v4/projects/{project_id}/pipelines/{pipeline_id}" logging.info(f"Fetching pipeline status from {url}") async with session.get(url) as response: response.raise_for_status() pipeline_details = await response.json() return pipeline_details.get("status") async def get_jobs_for_pipeline( session: aiohttp.ClientSession, project_id: str, pipeline_id: str ): url = f"https://gitlab.freedesktop.org/api/v4/projects/{project_id}/pipelines/{pipeline_id}/jobs" logging.info(url) jobs = [] params = {"per_page": PER_PAGE} async with session.get(url, params=params) as response: response.raise_for_status() jobs = await response.json() return jobs def get_problem_jobs(jobs: list[dict[str, Any]]): ignore_stage_list = [ "postmerge", "performance", ] problem_jobs = [] for job in jobs: if any(ignore.lower() in job["stage"] for ignore in ignore_stage_list): continue if job["status"] in {"failed", "canceled"}: problem_jobs.append(job) return problem_jobs def unexpected_improvements(failed_test_array): if failed_test_array["unexpected_improvements"]: unexpected_improvements_count = len( failed_test_array["unexpected_improvements"] ) return f" {unexpected_improvements_count} improved test{'s' if unexpected_improvements_count != 1 else ''}" return "" def fails(failed_test_array): if failed_test_array["fails"]: fails_count = len(failed_test_array["fails"]) return f" {fails_count} failed test{'s' if fails_count != 1 else ''}" return "" def crashes(failed_test_array): if failed_test_array["crashes"]: crash_count = len(failed_test_array["crashes"]) return f" {crash_count} crashed test{'s' if crash_count != 1 else ''}" return "" def get_failed_test_details(failed_test_array): message = "" max_tests_to_display = 5 if failed_test_array["unexpected_improvements"]: for i, test in enumerate(failed_test_array["unexpected_improvements"]): if i > max_tests_to_display: message += " \nand more...
" break message += f"{test}
" if failed_test_array["fails"]: for i, test in enumerate(failed_test_array["fails"]): if i > max_tests_to_display: message += " \nand more...
" break message += f"{test}
" if failed_test_array["crashes"]: for i, test in enumerate(failed_test_array["crashes"]): if i > max_tests_to_display: message += " \nand more...
" break message += f"{test}
" return message def get_failed_test_summary_message(failed_test_array): summary_msg = "" summary_msg += unexpected_improvements(failed_test_array) summary_msg += fails(failed_test_array) summary_msg += crashes(failed_test_array) summary_msg += "" return summary_msg def sort_failed_tests_by_status(failures_csv): failed_test_array = { "unexpected_improvements": [], "fails": [], "crashes": [], "timeouts": [], } for test in failures_csv.splitlines(): if "UnexpectedImprovement" in test: failed_test_array["unexpected_improvements"].append(test) elif "Fail" in test: failed_test_array["fails"].append(test) elif "Crash" in test: failed_test_array["crashes"].append(test) elif "Timeout" in test: failed_test_array["timeouts"].append(test) return failed_test_array async def get_failures_csv(session, project_id, job): job_id = job["id"] url = f"https://gitlab.freedesktop.org/api/v4/projects/{project_id}/jobs/{job_id}/artifacts/results/failures.csv" async with session.get(url) as response: if response.status == 200: text = await response.text() return text else: logging.debug(f"No response from: {url}") return "" async def get_test_failures(session, project_id, job): failures_csv = await get_failures_csv(session, project_id, job) if not failures_csv: return "" # If just one test failed, don't bother with more complicated sorting lines = failures_csv.splitlines() if len(lines) == 1: return ": " + lines[0] + "
" failed_test_array = sort_failed_tests_by_status(failures_csv) failures_msg = "
" failures_msg += get_failed_test_summary_message(failed_test_array) failures_msg += get_failed_test_details(failed_test_array) failures_msg += "
" return failures_msg async def get_trace_failures(session, project_id, job): project_json = await get_project_json(session, project_id) path = project_json.get("path", "") if not path: return "" job_id = job["id"] url = f"https://mesa.pages.freedesktop.org/-/{path}/-/jobs/{job_id}/artifacts/results/summary/problems.html" async with session.get(url) as response: if response.status == 200: return url else: logging.debug(f"No response from: {url}") return "" async def get_project_json(session, project_id): url_project_id = f"https://gitlab.freedesktop.org/api/v4/projects/{project_id}" async with session.get(url_project_id) as response: if response.status == 200: return await response.json() else: logging.debug(f"No response from: {url_project_id}") return "" async def get_job_log(session: aiohttp.ClientSession, project_id: str, job_id: int): project_json = await get_project_json(session, project_id) path_with_namespace = project_json.get("path_with_namespace", "") if not path_with_namespace: return "" url_job_log = ( f"https://gitlab.freedesktop.org/{path_with_namespace}/-/jobs/{job_id}/raw" ) async with session.get(url_job_log) as response: if response.status == 200: return await response.text() else: logging.debug(f"No response from job log: {url_job_log}") return "" async def search_job_log_for_errors(session, project_id, job): log_error_message = "" # Bypass these generic error messages in hopes of finding a more specific error. # The entries are case insensitive. Keep them in alphabetical order and don't # forget to add a comma after each entry ignore_list = [ "403: b", "aborting", "building c", "continuing", "error_msg : None", "error_type", "error generated", "errors generated", "exit code", "exit status", "exiting now", "job failed", "no_error", "no files to upload", "performing test", "ret code", "retry", "retry-all-errors", "strerror_", "success", "unknown-section", ] job_log = await get_job_log(session, project_id, job["id"]) for line in reversed(job_log.splitlines()): if "fatal" in line.lower(): # remove date and formatting before fatal message log_error_message = line[line.lower().find("fatal") :] break if "error" in line.lower(): if any(ignore.lower() in line.lower() for ignore in ignore_list): continue # remove date and formatting before error message log_error_message = line[line.lower().find("error") :].strip() # if there is no further info after the word error then it's not helpful # so reset the message and try again. if log_error_message.lower() in {"error", "errors", "error:", "errors:"}: log_error_message = "" continue break # timeout msg from .gitlab-ci/lava/lava_job_submitter.py if "expected to take at least" in line.lower(): log_error_message = line break return log_error_message async def process_single_job(session, project_id, job): job_url = job.get("web_url", "") if not job_url: logging.info(f"Job {job['name']} is missing a web_url") job_name = job.get("name", "Unnamed Job") message = f"[{job_name}]({job_url})" # if a job times out it's cancelled, so worth mentioning here if job["status"] == "canceled": return f"{message}: canceled
" # if it's not a script failure then all we can do is give the gitlab assigned reason if job["failure_reason"] != "script_failure": return f"{message}: {job['failure_reason']}
" test_failures = await get_test_failures(session, project_id, job) if test_failures: return f"{message}{test_failures}" trace_failures = await get_trace_failures(session, project_id, job) if trace_failures: return f"{message}: has a [trace failure]({trace_failures})
" log_error_message = await search_job_log_for_errors(session, project_id, job) if log_error_message: return f"{message}: {log_error_message}
" return f"{message}
" async def process_job_with_limit(session, project_id, job): # Use at most 10 concurrent tasks semaphore = asyncio.Semaphore(10) async with semaphore: return await process_single_job(session, project_id, job) async def process_problem_jobs(session, project_id, problem_jobs): problem_jobs_count = len(problem_jobs) if problem_jobs_count == 1: message = f"
There were problems with job: " message += await process_single_job(session, project_id, problem_jobs[0]) return message message = f"
" message += f"" message += f"There were problems with {problem_jobs_count} jobs: " message += "" tasks = [process_job_with_limit(session, project_id, job) for job in problem_jobs] results = await asyncio.gather(*tasks) for result in results: message += result message += f"
" return message async def main(pipeline_id: str, project_id: str = "176") -> str: message = "" try: timeout = aiohttp.ClientTimeout(total=120) logging.basicConfig(level=logging.INFO) async with aiohttp.ClientSession(timeout=timeout) as session: pipeline_status = await get_pipeline_status( session, project_id, pipeline_id ) logging.debug(f"Pipeline status: {pipeline_status}") if pipeline_status != "failed": return message jobs = await get_jobs_for_pipeline(session, project_id, pipeline_id) problem_jobs = get_problem_jobs(jobs) if len(problem_jobs) == 0: return message message = await process_problem_jobs(session, project_id, problem_jobs) except Exception as e: logging.error(f"An error occurred: {e}") return "" return message if __name__ == "__main__": parser = argparse.ArgumentParser(description="Fetch GitLab pipeline details") parser.add_argument( "--project-id", default="176", help="Project ID (default: 176 i.e. mesa/mesa)" ) parser.add_argument("--pipeline-id", required=True, help="Pipeline ID") args = parser.parse_args() message = asyncio.run(main(args.pipeline_id, args.project_id)) print(message)