
Some automatic jobs, such as 'rustfmt' and 'clang-format', are skipped during the graph sweep because their parents are already included in the node set. This commit ensures all visited jobs are in DAG and fixes iteration modification using deepcopy. Closes: https://gitlab.freedesktop.org/mesa/mesa/-/issues/9376 Signed-off-by: Guilherme Gallo <guilherme.gallo@collabora.com> Acked-by: David Heidelberg <david.heidelberg@collabora.com> Reviewed-by: Eric Engestrom <eric@engestrom.ch> Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/24176>
311 lines
9.6 KiB
Python
Executable File
311 lines
9.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# For the dependencies, see the requirements.txt
|
|
|
|
import re
|
|
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
|
|
from collections import defaultdict
|
|
from copy import deepcopy
|
|
from dataclasses import dataclass, field
|
|
from os import getenv
|
|
from pathlib import Path
|
|
from typing import Any, Iterable, Optional, Pattern, Union
|
|
|
|
import yaml
|
|
from filecache import DAY, filecache
|
|
from gql import Client, gql
|
|
from gql.transport.aiohttp import AIOHTTPTransport
|
|
from graphql import DocumentNode
|
|
|
|
Dag = dict[str, set[str]]
|
|
TOKEN_DIR = Path(getenv("XDG_CONFIG_HOME") or Path.home() / ".config")
|
|
|
|
|
|
def get_token_from_default_dir() -> str:
|
|
try:
|
|
token_file = TOKEN_DIR / "gitlab-token"
|
|
return token_file.resolve()
|
|
except FileNotFoundError as ex:
|
|
print(
|
|
f"Could not find {token_file}, please provide a token file as an argument"
|
|
)
|
|
raise ex
|
|
|
|
|
|
def get_project_root_dir():
|
|
root_path = Path(__file__).parent.parent.parent.resolve()
|
|
gitlab_file = root_path / ".gitlab-ci.yml"
|
|
assert gitlab_file.exists()
|
|
|
|
return root_path
|
|
|
|
|
|
@dataclass
|
|
class GitlabGQL:
|
|
_transport: Any = field(init=False)
|
|
client: Client = field(init=False)
|
|
url: str = "https://gitlab.freedesktop.org/api/graphql"
|
|
token: Optional[str] = None
|
|
|
|
def __post_init__(self):
|
|
self._setup_gitlab_gql_client()
|
|
|
|
def _setup_gitlab_gql_client(self) -> Client:
|
|
# Select your transport with a defined url endpoint
|
|
headers = {}
|
|
if self.token:
|
|
headers["Authorization"] = f"Bearer {self.token}"
|
|
self._transport = AIOHTTPTransport(
|
|
url=self.url, headers=headers, client_session_args = { "trust_env": True })
|
|
|
|
# Create a GraphQL client using the defined transport
|
|
self.client = Client(
|
|
transport=self._transport, fetch_schema_from_transport=True
|
|
)
|
|
|
|
@filecache(DAY)
|
|
def query(
|
|
self, gql_file: Union[Path, str], params: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
# Provide a GraphQL query
|
|
source_path = Path(__file__).parent
|
|
pipeline_query_file = source_path / gql_file
|
|
|
|
query: DocumentNode
|
|
with open(pipeline_query_file, "r") as f:
|
|
pipeline_query = f.read()
|
|
query = gql(pipeline_query)
|
|
|
|
# Execute the query on the transport
|
|
return self.client.execute(query, variable_values=params)
|
|
|
|
def invalidate_query_cache(self):
|
|
self.query._db.clear()
|
|
|
|
|
|
def create_job_needs_dag(
|
|
gl_gql: GitlabGQL, params
|
|
) -> tuple[Dag, dict[str, dict[str, Any]]]:
|
|
|
|
result = gl_gql.query("pipeline_details.gql", params)
|
|
incomplete_dag = defaultdict(set)
|
|
jobs = {}
|
|
pipeline = result["project"]["pipeline"]
|
|
if not pipeline:
|
|
raise RuntimeError(f"Could not find any pipelines for {params}")
|
|
|
|
for stage in pipeline["stages"]["nodes"]:
|
|
for stage_job in stage["groups"]["nodes"]:
|
|
for job in stage_job["jobs"]["nodes"]:
|
|
needs = job.pop("needs")["nodes"]
|
|
jobs[job["name"]] = job
|
|
incomplete_dag[job["name"]] = {node["name"] for node in needs}
|
|
# ensure that all needed nodes its in the graph
|
|
[incomplete_dag[node["name"]] for node in needs]
|
|
|
|
final_dag: Dag = {}
|
|
for job, needs in incomplete_dag.items():
|
|
final_needs: set = deepcopy(needs)
|
|
partial = True
|
|
|
|
while partial:
|
|
next_depth = {n for dn in final_needs for n in incomplete_dag[dn]}
|
|
partial = not final_needs.issuperset(next_depth)
|
|
final_needs = final_needs.union(next_depth)
|
|
|
|
final_dag[job] = final_needs
|
|
|
|
return final_dag, jobs
|
|
|
|
|
|
def filter_dag(dag: Dag, regex: Pattern) -> Dag:
|
|
return {job: needs for job, needs in dag.items() if re.match(regex, job)}
|
|
|
|
|
|
def print_dag(dag: Dag) -> None:
|
|
for job, needs in dag.items():
|
|
print(f"{job}:")
|
|
print(f"\t{' '.join(needs)}")
|
|
print()
|
|
|
|
|
|
def fetch_merged_yaml(gl_gql: GitlabGQL, params) -> dict[Any]:
|
|
gitlab_yml_file = get_project_root_dir() / ".gitlab-ci.yml"
|
|
content = Path(gitlab_yml_file).read_text().strip()
|
|
params["content"] = content
|
|
raw_response = gl_gql.query("job_details.gql", params)
|
|
if merged_yaml := raw_response["ciConfig"]["mergedYaml"]:
|
|
return yaml.safe_load(merged_yaml)
|
|
|
|
gl_gql.invalidate_query_cache()
|
|
raise ValueError(
|
|
"""
|
|
Could not fetch any content for merged YAML,
|
|
please verify if the git SHA exists in remote.
|
|
Maybe you forgot to `git push`? """
|
|
)
|
|
|
|
|
|
def recursive_fill(job, relationship_field, target_data, acc_data: dict, merged_yaml):
|
|
if relatives := job.get(relationship_field):
|
|
if isinstance(relatives, str):
|
|
relatives = [relatives]
|
|
|
|
for relative in relatives:
|
|
parent_job = merged_yaml[relative]
|
|
acc_data = recursive_fill(parent_job, acc_data, merged_yaml)
|
|
|
|
acc_data |= job.get(target_data, {})
|
|
|
|
return acc_data
|
|
|
|
|
|
def get_variables(job, merged_yaml, project_path, sha) -> dict[str, str]:
|
|
p = get_project_root_dir() / ".gitlab-ci" / "image-tags.yml"
|
|
image_tags = yaml.safe_load(p.read_text())
|
|
|
|
variables = image_tags["variables"]
|
|
variables |= merged_yaml["variables"]
|
|
variables |= job["variables"]
|
|
variables["CI_PROJECT_PATH"] = project_path
|
|
variables["CI_PROJECT_NAME"] = project_path.split("/")[1]
|
|
variables["CI_REGISTRY_IMAGE"] = "registry.freedesktop.org/${CI_PROJECT_PATH}"
|
|
variables["CI_COMMIT_SHA"] = sha
|
|
|
|
while recurse_among_variables_space(variables):
|
|
pass
|
|
|
|
return variables
|
|
|
|
|
|
# Based on: https://stackoverflow.com/a/2158532/1079223
|
|
def flatten(xs):
|
|
for x in xs:
|
|
if isinstance(x, Iterable) and not isinstance(x, (str, bytes)):
|
|
yield from flatten(x)
|
|
else:
|
|
yield x
|
|
|
|
|
|
def get_full_script(job) -> list[str]:
|
|
script = []
|
|
for script_part in ("before_script", "script", "after_script"):
|
|
script.append(f"# {script_part}")
|
|
lines = flatten(job.get(script_part, []))
|
|
script.extend(lines)
|
|
script.append("")
|
|
|
|
return script
|
|
|
|
|
|
def recurse_among_variables_space(var_graph) -> bool:
|
|
updated = False
|
|
for var, value in var_graph.items():
|
|
value = str(value)
|
|
dep_vars = []
|
|
if match := re.findall(r"(\$[{]?[\w\d_]*[}]?)", value):
|
|
all_dep_vars = [v.lstrip("${").rstrip("}") for v in match]
|
|
# print(value, match, all_dep_vars)
|
|
dep_vars = [v for v in all_dep_vars if v in var_graph]
|
|
|
|
for dep_var in dep_vars:
|
|
dep_value = str(var_graph[dep_var])
|
|
new_value = var_graph[var]
|
|
new_value = new_value.replace(f"${{{dep_var}}}", dep_value)
|
|
new_value = new_value.replace(f"${dep_var}", dep_value)
|
|
var_graph[var] = new_value
|
|
updated |= dep_value != new_value
|
|
|
|
return updated
|
|
|
|
|
|
def get_job_final_definition(job_name, merged_yaml, project_path, sha):
|
|
job = merged_yaml[job_name]
|
|
variables = get_variables(job, merged_yaml, project_path, sha)
|
|
|
|
print("# --------- variables ---------------")
|
|
for var, value in sorted(variables.items()):
|
|
print(f"export {var}={value!r}")
|
|
|
|
# TODO: Recurse into needs to get full script
|
|
# TODO: maybe create a extra yaml file to avoid too much rework
|
|
script = get_full_script(job)
|
|
print()
|
|
print()
|
|
print("# --------- full script ---------------")
|
|
print("\n".join(script))
|
|
|
|
if image := variables.get("MESA_IMAGE"):
|
|
print()
|
|
print()
|
|
print("# --------- container image ---------------")
|
|
print(image)
|
|
|
|
|
|
def parse_args() -> Namespace:
|
|
parser = ArgumentParser(
|
|
formatter_class=ArgumentDefaultsHelpFormatter,
|
|
description="CLI and library with utility functions to debug jobs via Gitlab GraphQL",
|
|
epilog=f"""Example:
|
|
{Path(__file__).name} --rev $(git rev-parse HEAD) --print-job-dag""",
|
|
)
|
|
parser.add_argument("-pp", "--project-path", type=str, default="mesa/mesa")
|
|
parser.add_argument("--sha", "--rev", type=str, required=True)
|
|
parser.add_argument(
|
|
"--regex",
|
|
type=str,
|
|
required=False,
|
|
help="Regex pattern for the job name to be considered",
|
|
)
|
|
parser.add_argument("--print-dag", action="store_true", help="Print job needs DAG")
|
|
parser.add_argument(
|
|
"--print-merged-yaml",
|
|
action="store_true",
|
|
help="Print the resulting YAML for the specific SHA",
|
|
)
|
|
parser.add_argument(
|
|
"--print-job-manifest", type=str, help="Print the resulting job data"
|
|
)
|
|
parser.add_argument(
|
|
"--gitlab-token-file",
|
|
type=str,
|
|
default=get_token_from_default_dir(),
|
|
help="force GitLab token, otherwise it's read from $XDG_CONFIG_HOME/gitlab-token",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
args.gitlab_token = Path(args.gitlab_token_file).read_text()
|
|
return args
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
gl_gql = GitlabGQL(token=args.gitlab_token)
|
|
|
|
if args.print_dag:
|
|
dag, jobs = create_job_needs_dag(
|
|
gl_gql, {"projectPath": args.project_path, "sha": args.sha}
|
|
)
|
|
|
|
if args.regex:
|
|
dag = filter_dag(dag, re.compile(args.regex))
|
|
print_dag(dag)
|
|
|
|
if args.print_merged_yaml:
|
|
print(
|
|
fetch_merged_yaml(
|
|
gl_gql, {"projectPath": args.project_path, "sha": args.sha}
|
|
)
|
|
)
|
|
|
|
if args.print_job_manifest:
|
|
merged_yaml = fetch_merged_yaml(
|
|
gl_gql, {"projectPath": args.project_path, "sha": args.sha}
|
|
)
|
|
get_job_final_definition(
|
|
args.print_job_manifest, merged_yaml, args.project_path, args.sha
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|