ci/bin: gql: Improve queries for jobs/stages retrieval

Modify the GraphQL query used to fetch all jobs within a pipeline,
transitioning from fetching data via stage nodes to a direct job
retrieval approach.

The prior method was not paginated, potentially overloading the server
and complicating result parsing due to the structure of stage nodes. The
new approach simplifies data interpretation and handles job lists
exceeding 100 elements by implementing pagination with helper functions
to concatenate paginated results.

- Transitioned from extracting jobs from stage nodes to a direct query
  for all jobs in the pipeline, improving data readability and server
  performance.
- With the enhanced data clarity from the updated query, removed the
  Dag+JobMetadata tuple as it's now redundant. The refined query
  provides a more comprehensive job data including job name, stage, and
  dependencies.
- The previous graph query relied on a graph node that will (or should)
  be paginated anyway.

Closes: #10050
Signed-off-by: Guilherme Gallo <guilherme.gallo@collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/25940>
This commit is contained in:
Guilherme Gallo
2023-10-28 00:45:03 -03:00
committed by Marge Bot
parent 664e6addea
commit 278fc1c22a
3 changed files with 121 additions and 71 deletions

View File

@@ -295,7 +295,7 @@ def parse_args() -> None:
def find_dependencies(target_jobs_regex: re.Pattern, project_path: str, iid: int) -> set[str]:
gql_instance = GitlabGQL()
dag, _ = create_job_needs_dag(
dag = create_job_needs_dag(
gql_instance, {"projectPath": project_path.path_with_namespace, "iid": iid}
)
@@ -308,7 +308,10 @@ def find_dependencies(target_jobs_regex: re.Pattern, project_path: str, iid: int
print()
print_dag(target_dep_dag)
print(Fore.RESET)
return set(chain.from_iterable(target_dep_dag.values()))
dependency_jobs = set(chain.from_iterable(d["needs"] for d in target_dep_dag.values()))
target_jobs = set(target_dep_dag.keys())
return target_jobs.union(dependency_jobs)
if __name__ == "__main__":

View File

@@ -5,13 +5,13 @@ import logging
import re
import traceback
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
from collections import OrderedDict, defaultdict
from collections import OrderedDict
from copy import deepcopy
from dataclasses import dataclass, field
from itertools import accumulate
from os import getenv
from pathlib import Path
from typing import Any, Iterable, Optional, Pattern, Union
from typing import Any, Iterable, Optional, Pattern, TypedDict, Union
import yaml
from filecache import DAY, filecache
@@ -19,7 +19,18 @@ from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport
from graphql import DocumentNode
Dag = dict[str, set[str]]
class DagNode(TypedDict):
needs: set[str]
stage: str
# `name` is redundant but is here for retro-compatibility
name: str
# see create_job_needs_dag function for more details
Dag = dict[str, DagNode]
StageSeq = OrderedDict[str, set[str]]
TOKEN_DIR = Path(getenv("XDG_CONFIG_HOME") or Path.home() / ".config")
@@ -200,104 +211,139 @@ class GitlabGQL:
logging.warning(f"Could not invalidate cache, maybe it was not used in {ex.args}?")
def insert_early_stage_jobs(dag: Dag, stage_sequence: StageSeq, jobs_metadata: dict) -> Dag:
pre_processed_dag: Dag = {}
def insert_early_stage_jobs(stage_sequence: StageSeq, jobs_metadata: Dag) -> Dag:
pre_processed_dag: dict[str, set[str]] = {}
jobs_from_early_stages = list(accumulate(stage_sequence.values(), set.union))
for job_name, needs in dag.items():
final_needs: set = deepcopy(needs)
for job_name, metadata in jobs_metadata.items():
final_needs: set[str] = deepcopy(metadata["needs"])
# Pre-process jobs that are not based on needs field
# e.g. sanity job in mesa MR pipelines
if not final_needs:
job_stage = jobs_metadata[job_name]["stage"]["name"]
stage_index = list(stage_sequence.keys()).index(job_stage)
job_stage: str = jobs_metadata[job_name]["stage"]
stage_index: int = list(stage_sequence.keys()).index(job_stage)
if stage_index > 0:
final_needs |= jobs_from_early_stages[stage_index - 1]
pre_processed_dag[job_name] = final_needs
return pre_processed_dag
for job_name, needs in pre_processed_dag.items():
jobs_metadata[job_name]["needs"] = needs
return jobs_metadata
def traverse_dag_needs(dag: Dag) -> None:
for job, needs in dag.items():
final_needs: set = deepcopy(needs)
def traverse_dag_needs(jobs_metadata: Dag) -> None:
created_jobs = set(jobs_metadata.keys())
for job, metadata in jobs_metadata.items():
final_needs: set = deepcopy(metadata["needs"]) & created_jobs
# Post process jobs that are based on needs field
partial = True
while partial:
next_depth = {n for dn in final_needs for n in dag[dn]}
partial = not final_needs.issuperset(next_depth)
next_depth: set[str] = {n for dn in final_needs for n in jobs_metadata[dn]["needs"]}
partial: bool = not final_needs.issuperset(next_depth)
final_needs = final_needs.union(next_depth)
dag[job] = final_needs
jobs_metadata[job]["needs"] = final_needs
def extract_stages_and_job_needs(pipeline_result: dict[str, Any]) -> tuple[Dag, StageSeq, dict]:
incomplete_dag = defaultdict(set)
jobs_metadata = {}
def extract_stages_and_job_needs(
pipeline_jobs: dict[str, Any], pipeline_stages: dict[str, Any]
) -> tuple[StageSeq, Dag]:
jobs_metadata = Dag()
# Record the stage sequence to post process deps that are not based on needs
# field, for example: sanity job
stage_sequence: OrderedDict[str, set[str]] = OrderedDict()
for stage in pipeline_result["stages"]["nodes"]:
stage_jobs: set[str] = set()
for stage_job in stage["groups"]["nodes"]:
for job in stage_job["jobs"]["nodes"]:
stage_jobs.add(job["name"])
needs = job.pop("needs")["nodes"]
jobs_metadata[job["name"]] = job
incomplete_dag[job["name"]] = {node["name"] for node in needs}
# ensure that all needed nodes its in the graph
[incomplete_dag[node["name"]] for node in needs]
stage_sequence[stage["name"]] = stage_jobs
for stage in pipeline_stages["nodes"]:
stage_sequence[stage["name"]] = set()
return incomplete_dag, stage_sequence, jobs_metadata
for job in pipeline_jobs["nodes"]:
stage_sequence[job["stage"]["name"]].add(job["name"])
dag_job: DagNode = {
"name": job["name"],
"stage": job["stage"]["name"],
"needs": set([j["node"]["name"] for j in job["needs"]["edges"]]),
}
jobs_metadata[job["name"]] = dag_job
return stage_sequence, jobs_metadata
def create_job_needs_dag(gl_gql: GitlabGQL, params) -> tuple[Dag, dict[str, dict[str, Any]]]:
def create_job_needs_dag(gl_gql: GitlabGQL, params, disable_cache: bool = True) -> Dag:
"""
The function `create_job_needs_dag` retrieves pipeline details from GitLab, extracts stages and
job needs, inserts early stage jobs, and returns the final DAG and job dictionary.
This function creates a Directed Acyclic Graph (DAG) to represent a sequence of jobs, where each
job has a set of jobs that it depends on (its "needs") and belongs to a certain "stage".
The "name" of the job is used as the key in the dictionary.
For example, consider the following DAG:
1. build stage: job1 -> job2 -> job3
2. test stage: job2 -> job4
- The job needs for job3 are: job1, job2
- The job needs for job4 are: job2
- The job2 needs to wait all jobs from build stage to finish.
The resulting DAG would look like this:
dag = {
"job1": {"needs": set(), "stage": "build", "name": "job1"},
"job2": {"needs": {"job1", "job2", job3"}, "stage": "test", "name": "job2"},
"job3": {"needs": {"job1", "job2"}, "stage": "build", "name": "job3"},
"job4": {"needs": {"job2"}, "stage": "test", "name": "job4"},
}
To access the job needs, one can do:
dag["job3"]["needs"]
This will return the set of jobs that job3 needs: {"job1", "job2"}
Args:
gl_gql (GitlabGQL): The `gl_gql` parameter is an instance of the `GitlabGQL` class, which is
used to make GraphQL queries to the GitLab API.
params: The `params` parameter is a dictionary that contains the necessary parameters for
the GraphQL query. It is used to specify the details of the pipeline for which the job
needs DAG is being created.
params (dict): The `params` parameter is a dictionary that contains the necessary parameters
for the GraphQL query. It is used to specify the details of the pipeline for which the
job needs DAG is being created.
The specific keys and values in the `params` dictionary will depend on
the requirements of the GraphQL query being executed
disable_cache (bool): The `disable_cache` parameter is a boolean that specifies whether the
Returns:
The function `create_job_needs_dag` returns a tuple containing two elements.
The first element is the final DAG (Directed Acyclic Graph) representing the stages and job
dependencies.
The second element is a dictionary containing information about the jobs in the DAG, where
the keys are job names and the values are dictionaries containing additional job
information.
The final DAG (Directed Acyclic Graph) representing the job dependencies sourced from needs
or stages rule.
"""
result = gl_gql.query("pipeline_details.gql", params)
pipeline = result["project"]["pipeline"]
if not pipeline:
stages_jobs_gql = gl_gql.query(
"pipeline_details.gql",
params=params,
paginated_key_loc=["project", "pipeline", "jobs"],
disable_cache=disable_cache,
)
pipeline_data = stages_jobs_gql["project"]["pipeline"]
if not pipeline_data:
raise RuntimeError(f"Could not find any pipelines for {params}")
incomplete_dag, stage_sequence, jobs_metadata = extract_stages_and_job_needs(pipeline)
stage_sequence, jobs_metadata = extract_stages_and_job_needs(
pipeline_data["jobs"], pipeline_data["stages"]
)
# Fill the DAG with the job needs from stages that don't have any needs but still need to wait
# for previous stages
final_dag = insert_early_stage_jobs(incomplete_dag, stage_sequence, jobs_metadata)
final_dag = insert_early_stage_jobs(stage_sequence, jobs_metadata)
# Now that each job has its direct needs filled correctly, update the "needs" field for each job
# in the DAG by performing a topological traversal
traverse_dag_needs(final_dag)
return final_dag, jobs_metadata
return final_dag
def filter_dag(dag: Dag, regex: Pattern) -> Dag:
return {job: needs for job, needs in dag.items() if regex.match(job)}
jobs_with_regex: set[str] = {job for job in dag if regex.match(job)}
return Dag({job: data for job, data in dag.items() if job in sorted(jobs_with_regex)})
def print_dag(dag: Dag) -> None:
for job, needs in dag.items():
for job, data in dag.items():
print(f"{job}:")
print(f"\t{' '.join(needs)}")
print(f"\t{' '.join(data['needs'])}")
print()

View File

@@ -1,4 +1,4 @@
query getPipelineDetails($projectPath: ID!, $iid: ID!) {
query jobs($projectPath: ID!, $iid: ID!, $cursor: String) {
project(fullPath: $projectPath) {
id
pipeline(iid: $iid) {
@@ -8,25 +8,26 @@ query getPipelineDetails($projectPath: ID!, $iid: ID!) {
stages {
nodes {
name
groups {
nodes {
jobs {
nodes {
id
name
stage {
name
}
needs {
nodes {
id
name
}
}
}
}
}
jobs(after: $cursor) {
pageInfo {
hasNextPage
endCursor
}
count
nodes {
name
needs {
edges {
node {
name
}
}
}
stage {
name
}
}
}
}