diff --git a/bin/ci/ci_run_n_monitor.py b/bin/ci/ci_run_n_monitor.py index 7306a21d0e4..0be9c1d48dc 100755 --- a/bin/ci/ci_run_n_monitor.py +++ b/bin/ci/ci_run_n_monitor.py @@ -295,7 +295,7 @@ def parse_args() -> None: def find_dependencies(target_jobs_regex: re.Pattern, project_path: str, iid: int) -> set[str]: gql_instance = GitlabGQL() - dag, _ = create_job_needs_dag( + dag = create_job_needs_dag( gql_instance, {"projectPath": project_path.path_with_namespace, "iid": iid} ) @@ -308,7 +308,10 @@ def find_dependencies(target_jobs_regex: re.Pattern, project_path: str, iid: int print() print_dag(target_dep_dag) print(Fore.RESET) - return set(chain.from_iterable(target_dep_dag.values())) + + dependency_jobs = set(chain.from_iterable(d["needs"] for d in target_dep_dag.values())) + target_jobs = set(target_dep_dag.keys()) + return target_jobs.union(dependency_jobs) if __name__ == "__main__": diff --git a/bin/ci/gitlab_gql.py b/bin/ci/gitlab_gql.py index 404fb9cccac..b0299b07314 100755 --- a/bin/ci/gitlab_gql.py +++ b/bin/ci/gitlab_gql.py @@ -5,13 +5,13 @@ import logging import re import traceback from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace -from collections import OrderedDict, defaultdict +from collections import OrderedDict from copy import deepcopy from dataclasses import dataclass, field from itertools import accumulate from os import getenv from pathlib import Path -from typing import Any, Iterable, Optional, Pattern, Union +from typing import Any, Iterable, Optional, Pattern, TypedDict, Union import yaml from filecache import DAY, filecache @@ -19,7 +19,18 @@ from gql import Client, gql from gql.transport.requests import RequestsHTTPTransport from graphql import DocumentNode -Dag = dict[str, set[str]] + +class DagNode(TypedDict): + needs: set[str] + stage: str + # `name` is redundant but is here for retro-compatibility + name: str + + +# see create_job_needs_dag function for more details +Dag = dict[str, DagNode] + + StageSeq = OrderedDict[str, set[str]] TOKEN_DIR = Path(getenv("XDG_CONFIG_HOME") or Path.home() / ".config") @@ -200,104 +211,139 @@ class GitlabGQL: logging.warning(f"Could not invalidate cache, maybe it was not used in {ex.args}?") -def insert_early_stage_jobs(dag: Dag, stage_sequence: StageSeq, jobs_metadata: dict) -> Dag: - pre_processed_dag: Dag = {} +def insert_early_stage_jobs(stage_sequence: StageSeq, jobs_metadata: Dag) -> Dag: + pre_processed_dag: dict[str, set[str]] = {} jobs_from_early_stages = list(accumulate(stage_sequence.values(), set.union)) - for job_name, needs in dag.items(): - final_needs: set = deepcopy(needs) + for job_name, metadata in jobs_metadata.items(): + final_needs: set[str] = deepcopy(metadata["needs"]) # Pre-process jobs that are not based on needs field # e.g. sanity job in mesa MR pipelines if not final_needs: - job_stage = jobs_metadata[job_name]["stage"]["name"] - stage_index = list(stage_sequence.keys()).index(job_stage) + job_stage: str = jobs_metadata[job_name]["stage"] + stage_index: int = list(stage_sequence.keys()).index(job_stage) if stage_index > 0: final_needs |= jobs_from_early_stages[stage_index - 1] pre_processed_dag[job_name] = final_needs - return pre_processed_dag + for job_name, needs in pre_processed_dag.items(): + jobs_metadata[job_name]["needs"] = needs + + return jobs_metadata -def traverse_dag_needs(dag: Dag) -> None: - for job, needs in dag.items(): - final_needs: set = deepcopy(needs) +def traverse_dag_needs(jobs_metadata: Dag) -> None: + created_jobs = set(jobs_metadata.keys()) + for job, metadata in jobs_metadata.items(): + final_needs: set = deepcopy(metadata["needs"]) & created_jobs # Post process jobs that are based on needs field partial = True while partial: - next_depth = {n for dn in final_needs for n in dag[dn]} - partial = not final_needs.issuperset(next_depth) + next_depth: set[str] = {n for dn in final_needs for n in jobs_metadata[dn]["needs"]} + partial: bool = not final_needs.issuperset(next_depth) final_needs = final_needs.union(next_depth) - dag[job] = final_needs + jobs_metadata[job]["needs"] = final_needs -def extract_stages_and_job_needs(pipeline_result: dict[str, Any]) -> tuple[Dag, StageSeq, dict]: - incomplete_dag = defaultdict(set) - jobs_metadata = {} +def extract_stages_and_job_needs( + pipeline_jobs: dict[str, Any], pipeline_stages: dict[str, Any] +) -> tuple[StageSeq, Dag]: + jobs_metadata = Dag() # Record the stage sequence to post process deps that are not based on needs # field, for example: sanity job stage_sequence: OrderedDict[str, set[str]] = OrderedDict() - for stage in pipeline_result["stages"]["nodes"]: - stage_jobs: set[str] = set() - for stage_job in stage["groups"]["nodes"]: - for job in stage_job["jobs"]["nodes"]: - stage_jobs.add(job["name"]) - needs = job.pop("needs")["nodes"] - jobs_metadata[job["name"]] = job - incomplete_dag[job["name"]] = {node["name"] for node in needs} - # ensure that all needed nodes its in the graph - [incomplete_dag[node["name"]] for node in needs] - stage_sequence[stage["name"]] = stage_jobs + for stage in pipeline_stages["nodes"]: + stage_sequence[stage["name"]] = set() - return incomplete_dag, stage_sequence, jobs_metadata + for job in pipeline_jobs["nodes"]: + stage_sequence[job["stage"]["name"]].add(job["name"]) + dag_job: DagNode = { + "name": job["name"], + "stage": job["stage"]["name"], + "needs": set([j["node"]["name"] for j in job["needs"]["edges"]]), + } + jobs_metadata[job["name"]] = dag_job + + return stage_sequence, jobs_metadata -def create_job_needs_dag(gl_gql: GitlabGQL, params) -> tuple[Dag, dict[str, dict[str, Any]]]: +def create_job_needs_dag(gl_gql: GitlabGQL, params, disable_cache: bool = True) -> Dag: """ - The function `create_job_needs_dag` retrieves pipeline details from GitLab, extracts stages and - job needs, inserts early stage jobs, and returns the final DAG and job dictionary. + This function creates a Directed Acyclic Graph (DAG) to represent a sequence of jobs, where each + job has a set of jobs that it depends on (its "needs") and belongs to a certain "stage". + The "name" of the job is used as the key in the dictionary. + + For example, consider the following DAG: + + 1. build stage: job1 -> job2 -> job3 + 2. test stage: job2 -> job4 + + - The job needs for job3 are: job1, job2 + - The job needs for job4 are: job2 + - The job2 needs to wait all jobs from build stage to finish. + + The resulting DAG would look like this: + + dag = { + "job1": {"needs": set(), "stage": "build", "name": "job1"}, + "job2": {"needs": {"job1", "job2", job3"}, "stage": "test", "name": "job2"}, + "job3": {"needs": {"job1", "job2"}, "stage": "build", "name": "job3"}, + "job4": {"needs": {"job2"}, "stage": "test", "name": "job4"}, + } + + To access the job needs, one can do: + + dag["job3"]["needs"] + + This will return the set of jobs that job3 needs: {"job1", "job2"} Args: gl_gql (GitlabGQL): The `gl_gql` parameter is an instance of the `GitlabGQL` class, which is used to make GraphQL queries to the GitLab API. - params: The `params` parameter is a dictionary that contains the necessary parameters for - the GraphQL query. It is used to specify the details of the pipeline for which the job - needs DAG is being created. + params (dict): The `params` parameter is a dictionary that contains the necessary parameters + for the GraphQL query. It is used to specify the details of the pipeline for which the + job needs DAG is being created. The specific keys and values in the `params` dictionary will depend on the requirements of the GraphQL query being executed + disable_cache (bool): The `disable_cache` parameter is a boolean that specifies whether the Returns: - The function `create_job_needs_dag` returns a tuple containing two elements. - The first element is the final DAG (Directed Acyclic Graph) representing the stages and job - dependencies. - The second element is a dictionary containing information about the jobs in the DAG, where - the keys are job names and the values are dictionaries containing additional job - information. + The final DAG (Directed Acyclic Graph) representing the job dependencies sourced from needs + or stages rule. """ - result = gl_gql.query("pipeline_details.gql", params) - pipeline = result["project"]["pipeline"] - if not pipeline: + stages_jobs_gql = gl_gql.query( + "pipeline_details.gql", + params=params, + paginated_key_loc=["project", "pipeline", "jobs"], + disable_cache=disable_cache, + ) + pipeline_data = stages_jobs_gql["project"]["pipeline"] + if not pipeline_data: raise RuntimeError(f"Could not find any pipelines for {params}") - incomplete_dag, stage_sequence, jobs_metadata = extract_stages_and_job_needs(pipeline) + stage_sequence, jobs_metadata = extract_stages_and_job_needs( + pipeline_data["jobs"], pipeline_data["stages"] + ) # Fill the DAG with the job needs from stages that don't have any needs but still need to wait # for previous stages - final_dag = insert_early_stage_jobs(incomplete_dag, stage_sequence, jobs_metadata) + final_dag = insert_early_stage_jobs(stage_sequence, jobs_metadata) # Now that each job has its direct needs filled correctly, update the "needs" field for each job # in the DAG by performing a topological traversal traverse_dag_needs(final_dag) - return final_dag, jobs_metadata + return final_dag def filter_dag(dag: Dag, regex: Pattern) -> Dag: - return {job: needs for job, needs in dag.items() if regex.match(job)} + jobs_with_regex: set[str] = {job for job in dag if regex.match(job)} + return Dag({job: data for job, data in dag.items() if job in sorted(jobs_with_regex)}) def print_dag(dag: Dag) -> None: - for job, needs in dag.items(): + for job, data in dag.items(): print(f"{job}:") - print(f"\t{' '.join(needs)}") + print(f"\t{' '.join(data['needs'])}") print() diff --git a/bin/ci/pipeline_details.gql b/bin/ci/pipeline_details.gql index f723084e4cf..2c8be9fc5ba 100644 --- a/bin/ci/pipeline_details.gql +++ b/bin/ci/pipeline_details.gql @@ -1,4 +1,4 @@ -query getPipelineDetails($projectPath: ID!, $iid: ID!) { +query jobs($projectPath: ID!, $iid: ID!, $cursor: String) { project(fullPath: $projectPath) { id pipeline(iid: $iid) { @@ -8,25 +8,26 @@ query getPipelineDetails($projectPath: ID!, $iid: ID!) { stages { nodes { name - groups { - nodes { - jobs { - nodes { - id - name - stage { - name - } - needs { - nodes { - id - name - } - } - } + } + } + jobs(after: $cursor) { + pageInfo { + hasNextPage + endCursor + } + count + nodes { + name + needs { + edges { + node { + name } } } + stage { + name + } } } }