ci/bin: gql: Implement pagination

Make query support pagination by supplying the paginated key.

In the following toy example, the paginated key is:
["levels", "cars"]

```graphql
query vehicle_store($location: ID!) {
  levels {
     cars {
        pageInfo {
          hasNextPage
          endCursor
        }
     ...
     }
  }
}
```

Signed-off-by: Guilherme Gallo <guilherme.gallo@collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/25940>
This commit is contained in:
Guilherme Gallo
2023-11-06 22:25:40 -03:00
committed by Marge Bot
parent c4b8c03012
commit 664e6addea

View File

@@ -68,16 +68,27 @@ class GitlabGQL:
gql_file: Union[Path, str],
params: dict[str, Any] = {},
operation_name: Optional[str] = None,
paginated_key_loc: Iterable[str] = [],
disable_cache: bool = False,
) -> dict[str, Any]:
def run_uncached() -> dict[str, Any]:
if paginated_key_loc:
return self._sweep_pages(gql_file, params, operation_name, paginated_key_loc)
return self._query(gql_file, params, operation_name)
if disable_cache:
return run_uncached()
try:
return self._query_cached(gql_file, params, operation_name)
# Create an auxiliary variable to deliver a cached result and enable catching exceptions
# Decorate the query to be cached
if paginated_key_loc:
result = self._sweep_pages_cached(
gql_file, params, operation_name, paginated_key_loc
)
else:
result = self._query_cached(gql_file, params, operation_name)
return result # type: ignore
except Exception as ex:
logging.error(f"Cached query failed with {ex}")
# print exception traceback
@@ -108,13 +119,82 @@ class GitlabGQL:
query, variable_values=params, operation_name=operation_name
)
@filecache(DAY)
def _sweep_pages_cached(self, *args, **kwargs):
return self._sweep_pages(*args, **kwargs)
@filecache(DAY)
def _query_cached(self, *args, **kwargs):
return self._query(*args, **kwargs)
def _sweep_pages(
self, query, params, operation_name=None, paginated_key_loc: Iterable[str] = []
) -> dict[str, Any]:
"""
Retrieve paginated data from a GraphQL API and concatenate the results into a single
response.
Args:
query: represents a filepath with the GraphQL query to be executed.
params: a dictionary that contains the parameters to be passed to the query. These
parameters can be used to filter or modify the results of the query.
operation_name: The `operation_name` parameter is an optional parameter that specifies
the name of the GraphQL operation to be executed. It is used when making a GraphQL
query to specify which operation to execute if there are multiple operations defined
in the GraphQL schema. If not provided, the default operation will be executed.
paginated_key_loc (Iterable[str]): The `paginated_key_loc` parameter is an iterable of
strings that represents the location of the paginated field within the response. It
is used to extract the paginated field from the response and append it to the final
result. The node has to be a list of objects with a `pageInfo` field that contains
at least the `hasNextPage` and `endCursor` fields.
Returns:
a dictionary containing the response from the query with the paginated field
concatenated.
"""
def fetch_page(cursor: str | None = None) -> dict[str, Any]:
if cursor:
params["cursor"] = cursor
logging.info(
f"Found more than 100 elements, paginating. "
f"Current cursor at {cursor}"
)
return self._query(query, params, operation_name)
# Execute the initial query
response: dict[str, Any] = fetch_page()
# Initialize an empty list to store the final result
final_partial_field: list[dict[str, Any]] = []
# Loop until all pages have been retrieved
while True:
# Get the partial field to be appended to the final result
partial_field = response
for key in paginated_key_loc:
partial_field = partial_field[key]
# Append the partial field to the final result
final_partial_field += partial_field["nodes"]
# Check if there are more pages to retrieve
page_info = partial_field["pageInfo"]
if not page_info["hasNextPage"]:
break
# Execute the query with the updated cursor parameter
response = fetch_page(page_info["endCursor"])
# Replace the "nodes" field in the original response with the final result
partial_field["nodes"] = final_partial_field
return response
def invalidate_query_cache(self) -> None:
logging.warning("Invalidating query cache")
try:
self._sweep_pages._db.clear()
self._query._db.clear()
except AttributeError as ex:
logging.warning(f"Could not invalidate cache, maybe it was not used in {ex.args}?")