ci/bin: Refactor create_job_needs_dag

The function is getting too big, let's add comments, docstrings to the
most important function, new type hints and extract methods from it to
make it easier to read.

Signed-off-by: Guilherme Gallo <guilherme.gallo@collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/25858>
This commit is contained in:
Guilherme Gallo
2023-10-24 00:33:31 -03:00
committed by Marge Bot
parent 4e4743ec0a
commit 969ede4521

View File

@@ -18,6 +18,7 @@ from gql.transport.aiohttp import AIOHTTPTransport
from graphql import DocumentNode
Dag = dict[str, set[str]]
StageSeq = OrderedDict[str, set[str]]
TOKEN_DIR = Path(getenv("XDG_CONFIG_HOME") or Path.home() / ".config")
@@ -83,57 +84,92 @@ class GitlabGQL:
self.query._db.clear()
def create_job_needs_dag(
gl_gql: GitlabGQL, params
) -> tuple[Dag, dict[str, dict[str, Any]]]:
result = gl_gql.query("pipeline_details.gql", params)
incomplete_dag = defaultdict(set)
jobs = {}
pipeline = result["project"]["pipeline"]
if not pipeline:
raise RuntimeError(f"Could not find any pipelines for {params}")
# Record the stage sequence to post process deps that are not based on needs
# field, for example: sanity job
stage_sequence: OrderedDict[str, set[str]] = OrderedDict()
for stage in pipeline["stages"]["nodes"]:
stage_jobs: set[str] = set()
for stage_job in stage["groups"]["nodes"]:
for job in stage_job["jobs"]["nodes"]:
stage_jobs.add(job["name"])
needs = job.pop("needs")["nodes"]
jobs[job["name"]] = job
incomplete_dag[job["name"]] = {node["name"] for node in needs}
# ensure that all needed nodes its in the graph
[incomplete_dag[node["name"]] for node in needs]
stage_sequence[stage["name"]] = stage_jobs
def insert_early_stage_jobs(dag: Dag, stage_sequence: StageSeq, jobs_metadata: dict) -> Dag:
pre_processed_dag: Dag = {}
final_dag: Dag = {}
for job, needs in incomplete_dag.items():
jobs_from_early_stages = list(accumulate(stage_sequence.values(), set.union))
for job_name, needs in dag.items():
final_needs: set = deepcopy(needs)
# Pre-process jobs that are not based on needs field
# e.g. sanity job in mesa MR pipelines
if not final_needs:
for stage_index, stage_jobs in enumerate(stage_sequence.values()):
if job in stage_jobs:
break
job_stage = jobs_metadata[job_name]["stage"]["name"]
stage_index = list(stage_sequence.keys()).index(job_stage)
if stage_index > 0:
final_needs |= jobs_from_early_stages[stage_index - 1]
pre_processed_dag[job_name] = final_needs
for prev_stage, prev_stage_jobs in list(stage_sequence.items())[:stage_index]:
final_needs |= prev_stage_jobs
pre_processed_dag[job] = final_needs
return pre_processed_dag
for job, needs in pre_processed_dag.items():
def traverse_dag_needs(dag: Dag) -> None:
for job, needs in dag.items():
final_needs: set = deepcopy(needs)
# Post process jobs that are based on needs field
partial = True
while partial:
next_depth = {n for dn in final_needs for n in pre_processed_dag[dn]}
next_depth = {n for dn in final_needs for n in dag[dn]}
partial = not final_needs.issuperset(next_depth)
final_needs = final_needs.union(next_depth)
final_dag[job] = final_needs
dag[job] = final_needs
def extract_stages_and_job_needs(pipeline_result: dict[str, Any]) -> tuple[Dag, StageSeq, dict]:
incomplete_dag = defaultdict(set)
jobs_metadata = {}
# Record the stage sequence to post process deps that are not based on needs
# field, for example: sanity job
stage_sequence: OrderedDict[str, set[str]] = OrderedDict()
for stage in pipeline_result["stages"]["nodes"]:
stage_jobs: set[str] = set()
for stage_job in stage["groups"]["nodes"]:
for job in stage_job["jobs"]["nodes"]:
stage_jobs.add(job["name"])
needs = job.pop("needs")["nodes"]
jobs_metadata[job["name"]] = job
incomplete_dag[job["name"]] = {node["name"] for node in needs}
# ensure that all needed nodes its in the graph
[incomplete_dag[node["name"]] for node in needs]
stage_sequence[stage["name"]] = stage_jobs
return incomplete_dag, stage_sequence, jobs_metadata
def create_job_needs_dag(gl_gql: GitlabGQL, params) -> tuple[Dag, dict[str, dict[str, Any]]]:
"""
The function `create_job_needs_dag` retrieves pipeline details from GitLab, extracts stages and
job needs, inserts early stage jobs, and returns the final DAG and job dictionary.
Args:
gl_gql (GitlabGQL): The `gl_gql` parameter is an instance of the `GitlabGQL` class, which is
used to make GraphQL queries to the GitLab API.
params: The `params` parameter is a dictionary that contains the necessary parameters for
the GraphQL query. It is used to specify the details of the pipeline for which the job
needs DAG is being created.
The specific keys and values in the `params` dictionary will depend on
the requirements of the GraphQL query being executed
Returns:
The function `create_job_needs_dag` returns a tuple containing two elements.
The first element is the final DAG (Directed Acyclic Graph) representing the stages and job
dependencies.
The second element is a dictionary containing information about the jobs in the DAG, where
the keys are job names and the values are dictionaries containing additional job
information.
"""
result = gl_gql.query("pipeline_details.gql", params)
pipeline = result["project"]["pipeline"]
if not pipeline:
raise RuntimeError(f"Could not find any pipelines for {params}")
incomplete_dag, stage_sequence, jobs_metadata = extract_stages_and_job_needs(pipeline)
# Fill the DAG with the job needs from stages that don't have any needs but still need to wait
# for previous stages
final_dag = insert_early_stage_jobs(incomplete_dag, stage_sequence, jobs_metadata)
# Now that each job has its direct needs filled correctly, update the "needs" field for each job
# in the DAG by performing a topological traversal
traverse_dag_needs(final_dag)
return final_dag, jobs_metadata