diff --git a/bin/ci/gitlab_gql.py b/bin/ci/gitlab_gql.py index cf07d59de08..b03309da652 100755 --- a/bin/ci/gitlab_gql.py +++ b/bin/ci/gitlab_gql.py @@ -3,9 +3,10 @@ import re from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace -from collections import defaultdict +from collections import OrderedDict, defaultdict from copy import deepcopy from dataclasses import dataclass, field +from itertools import accumulate from os import getenv from pathlib import Path from typing import Any, Iterable, Optional, Pattern, Union @@ -85,7 +86,6 @@ class GitlabGQL: def create_job_needs_dag( gl_gql: GitlabGQL, params ) -> tuple[Dag, dict[str, dict[str, Any]]]: - result = gl_gql.query("pipeline_details.gql", params) incomplete_dag = defaultdict(set) jobs = {} @@ -93,28 +93,49 @@ def create_job_needs_dag( if not pipeline: raise RuntimeError(f"Could not find any pipelines for {params}") + # Record the stage sequence to post process deps that are not based on needs + # field, for example: sanity job + stage_sequence: OrderedDict[str, set[str]] = OrderedDict() for stage in pipeline["stages"]["nodes"]: + stage_jobs: set[str] = set() for stage_job in stage["groups"]["nodes"]: for job in stage_job["jobs"]["nodes"]: + stage_jobs.add(job["name"]) needs = job.pop("needs")["nodes"] jobs[job["name"]] = job incomplete_dag[job["name"]] = {node["name"] for node in needs} # ensure that all needed nodes its in the graph [incomplete_dag[node["name"]] for node in needs] + stage_sequence[stage["name"]] = stage_jobs + pre_processed_dag: Dag = {} final_dag: Dag = {} for job, needs in incomplete_dag.items(): final_needs: set = deepcopy(needs) + # Pre-process jobs that are not based on needs field + # e.g. sanity job in mesa MR pipelines + if not final_needs: + for stage_index, stage_jobs in enumerate(stage_sequence.values()): + if job in stage_jobs: + break + + for prev_stage, prev_stage_jobs in list(stage_sequence.items())[:stage_index]: + final_needs |= prev_stage_jobs + pre_processed_dag[job] = final_needs + + for job, needs in pre_processed_dag.items(): + final_needs: set = deepcopy(needs) + # Post process jobs that are based on needs field partial = True while partial: - next_depth = {n for dn in final_needs for n in incomplete_dag[dn]} + next_depth = {n for dn in final_needs for n in pre_processed_dag[dn]} partial = not final_needs.issuperset(next_depth) final_needs = final_needs.union(next_depth) final_dag[job] = final_needs - return final_dag, jobs + return final_dag, jobs_metadata def filter_dag(dag: Dag, regex: Pattern) -> Dag: diff --git a/bin/ci/pipeline_details.gql b/bin/ci/pipeline_details.gql index d514d259b6b..f723084e4cf 100644 --- a/bin/ci/pipeline_details.gql +++ b/bin/ci/pipeline_details.gql @@ -7,13 +7,16 @@ query getPipelineDetails($projectPath: ID!, $iid: ID!) { complete stages { nodes { - name, + name groups { nodes { jobs { nodes { id name + stage { + name + } needs { nodes { id