diff --git a/bin/ci/structured_logger.py b/bin/ci/structured_logger.py new file mode 100644 index 00000000000..14c278f7e97 --- /dev/null +++ b/bin/ci/structured_logger.py @@ -0,0 +1,294 @@ +""" +A structured logging utility supporting multiple data formats such as CSV, JSON, +and YAML. + +The main purpose of this script, besides having relevant information available +in a condensed and deserialized. + +This script defines a protocol for different file handling strategies and provides +implementations for CSV, JSON, and YAML formats. The main class, StructuredLogger, +allows for easy interaction with log data, enabling users to load, save, increment, +set, and append fields in the log. The script also includes context managers for +file locking and editing log data to ensure data integrity and avoid race conditions. +""" + +import json +import os +from collections.abc import MutableMapping, MutableSequence +from contextlib import contextmanager +from datetime import datetime +from pathlib import Path +from typing import Any, Protocol + +import fire +from filelock import FileLock + +try: + import polars as pl + + CSV_LIB_EXCEPTION = None +except ImportError as e: + CSV_LIB_EXCEPTION: ImportError = e + +try: + from ruamel.yaml import YAML + + YAML_LIB_EXCEPTION = None +except ImportError as e: + YAML_LIB_EXCEPTION: ImportError = e + + +class ContainerProxy: + """ + A proxy class that wraps a mutable container object (such as a dictionary or + a list) and calls a provided save_callback function whenever the container + or its contents + are changed. + """ + def __init__(self, container, save_callback): + self.container = container + self.save_callback = save_callback + + def __getitem__(self, key): + value = self.container[key] + if isinstance(value, (MutableMapping, MutableSequence)): + return ContainerProxy(value, self.save_callback) + return value + + def __setitem__(self, key, value): + self.container[key] = value + self.save_callback() + + def __delitem__(self, key): + del self.container[key] + self.save_callback() + + def __getattr__(self, name): + attr = getattr(self.container, name) + + if callable(attr): + def wrapper(*args, **kwargs): + result = attr(*args, **kwargs) + self.save_callback() + return result + + return wrapper + return attr + + def __iter__(self): + return iter(self.container) + + def __len__(self): + return len(self.container) + + def __repr__(self): + return repr(self.container) + + +class AutoSaveDict(dict): + """ + A subclass of the built-in dict class with additional functionality to + automatically save changes to the dictionary. It maintains a timestamp of + the last modification and automatically wraps nested mutable containers + using ContainerProxy. + """ + timestamp_key = "_timestamp" + + def __init__(self, *args, save_callback, register_timestamp=True, **kwargs): + self.save_callback = save_callback + self.__register_timestamp = register_timestamp + self.__heartbeat() + super().__init__(*args, **kwargs) + self.__wrap_dictionaries() + + def __heartbeat(self): + if self.__register_timestamp: + self[AutoSaveDict.timestamp_key] = datetime.now().isoformat() + + def __save(self): + self.__heartbeat() + self.save_callback() + + def __wrap_dictionaries(self): + for key, value in self.items(): + if isinstance(value, MutableMapping) and not isinstance( + value, AutoSaveDict + ): + self[key] = AutoSaveDict( + value, save_callback=self.save_callback, register_timestamp=False + ) + + def __setitem__(self, key, value): + if isinstance(value, MutableMapping) and not isinstance(value, AutoSaveDict): + value = AutoSaveDict( + value, save_callback=self.save_callback, register_timestamp=False + ) + super().__setitem__(key, value) + + if self.__register_timestamp and key == AutoSaveDict.timestamp_key: + return + self.__save() + + def __getitem__(self, key): + value = super().__getitem__(key) + if isinstance(value, (MutableMapping, MutableSequence)): + return ContainerProxy(value, self.__save) + return value + + def __delitem__(self, key): + super().__delitem__(key) + self.__save() + + def pop(self, *args, **kwargs): + result = super().pop(*args, **kwargs) + self.__save() + return result + + def update(self, *args, **kwargs): + super().update(*args, **kwargs) + self.__wrap_dictionaries() + self.__save() + + +class StructuredLoggerStrategy(Protocol): + def load_data(self, file_path: Path) -> dict: + pass + + def save_data(self, file_path: Path, data: dict) -> None: + pass + + +class CSVStrategy: + def __init__(self) -> None: + if CSV_LIB_EXCEPTION: + raise RuntimeError( + "Can't parse CSV files. Missing library" + ) from CSV_LIB_EXCEPTION + + def load_data(self, file_path: Path) -> dict: + dicts: list[dict[str, Any]] = pl.read_csv( + file_path, try_parse_dates=True + ).to_dicts() + data = {} + for d in dicts: + for k, v in d.items(): + if k != AutoSaveDict.timestamp_key and k in data: + if isinstance(data[k], list): + data[k].append(v) + continue + data[k] = [data[k], v] + else: + data[k] = v + return data + + def save_data(self, file_path: Path, data: dict) -> None: + pl.DataFrame(data).write_csv(file_path) + + +class JSONStrategy: + def load_data(self, file_path: Path) -> dict: + return json.loads(file_path.read_text()) + + def save_data(self, file_path: Path, data: dict) -> None: + with open(file_path, "w") as f: + json.dump(data, f, indent=2) + + +class YAMLStrategy: + def __init__(self): + if YAML_LIB_EXCEPTION: + raise RuntimeError( + "Can't parse YAML files. Missing library" + ) from YAML_LIB_EXCEPTION + self.yaml = YAML() + self.yaml.indent(sequence=4, offset=2) + self.yaml.default_flow_style = False + self.yaml.representer.add_representer(AutoSaveDict, self.represent_dict) + + @classmethod + def represent_dict(cls, dumper, data): + return dumper.represent_mapping("tag:yaml.org,2002:map", data) + + def load_data(self, file_path: Path) -> dict: + return self.yaml.load(file_path.read_text()) + + def save_data(self, file_path: Path, data: dict) -> None: + with open(file_path, "w") as f: + self.yaml.dump(data, f) + + +class StructuredLogger: + def __init__( + self, file_name: str, strategy: StructuredLoggerStrategy = None, truncate=False + ): + self.file_name: str = file_name + self.file_path = Path(self.file_name) + self._data: AutoSaveDict = AutoSaveDict(save_callback=self.save_data) + + if strategy is None: + self.strategy: StructuredLoggerStrategy = self.guess_strategy_from_file( + self.file_path + ) + else: + self.strategy = strategy + + if not self.file_path.exists(): + Path.mkdir(self.file_path.parent, exist_ok=True) + self.save_data() + return + + if truncate: + with self.get_lock(): + os.truncate(self.file_path, 0) + self.save_data() + + def load_data(self): + self._data = self.strategy.load_data(self.file_path) + + def save_data(self): + self.strategy.save_data(self.file_path, self._data) + + @property + def data(self) -> AutoSaveDict: + return self._data + + @contextmanager + def get_lock(self): + with FileLock(f"{self.file_path}.lock", timeout=10): + yield + + @contextmanager + def edit_context(self): + """ + Context manager that ensures proper loading and saving of log data when + performing multiple modifications. + """ + with self.get_lock(): + try: + self.load_data() + yield + finally: + self.save_data() + + @staticmethod + def guess_strategy_from_file(file_path: Path) -> StructuredLoggerStrategy: + file_extension = file_path.suffix.lower().lstrip(".") + return StructuredLogger.get_strategy(file_extension) + + @staticmethod + def get_strategy(strategy_name: str) -> StructuredLoggerStrategy: + strategies = { + "csv": CSVStrategy, + "json": JSONStrategy, + "yaml": YAMLStrategy, + "yml": YAMLStrategy, + } + + try: + return strategies[strategy_name]() + except KeyError as e: + raise ValueError(f"Unknown strategy for: {strategy_name}") from e + + +if __name__ == "__main__": + fire.Fire(StructuredLogger) diff --git a/bin/ci/test/test_structured_logger.py b/bin/ci/test/test_structured_logger.py new file mode 100644 index 00000000000..2ec7fac8335 --- /dev/null +++ b/bin/ci/test/test_structured_logger.py @@ -0,0 +1,182 @@ +import json +from pathlib import Path + +import pytest +from mock import MagicMock, patch +from structured_logger import ( + AutoSaveDict, + CSVStrategy, + JSONStrategy, + StructuredLogger, + YAMLStrategy, +) + + +@pytest.fixture(params=[CSVStrategy, JSONStrategy, YAMLStrategy]) +def strategy(request): + return request.param + + +@pytest.fixture +def file_extension(strategy): + if strategy == CSVStrategy: + return "csv" + elif strategy == JSONStrategy: + return "json" + elif strategy == YAMLStrategy: + return "yaml" + + +@pytest.fixture +def tmp_file(tmp_path): + return tmp_path / "test.json" + + +def test_guess_strategy_from_file(tmp_path, strategy, file_extension): + file_name = tmp_path / f"test_guess.{file_extension}" + Path(file_name).touch() + guessed_strategy = StructuredLogger.guess_strategy_from_file(file_name) + assert isinstance(guessed_strategy, strategy) + + +def test_get_strategy(strategy, file_extension): + result = StructuredLogger.get_strategy(file_extension) + assert isinstance(result, strategy) + + +def test_invalid_file_extension(tmp_path): + file_name = tmp_path / "test_invalid.xyz" + Path(file_name).touch() + + with pytest.raises(ValueError, match="Unknown strategy for: xyz"): + StructuredLogger.guess_strategy_from_file(file_name) + + +def test_non_existent_file(tmp_path, strategy, file_extension): + file_name = tmp_path / f"non_existent.{file_extension}" + logger = StructuredLogger(file_name, strategy()) + + assert logger.file_path.exists() + assert "_timestamp" in logger._data + + +@pytest.fixture +def structured_logger_module(): + with patch.dict("sys.modules", {"polars": None, "ruamel.yaml": None}): + import importlib + + import structured_logger + + importlib.reload(structured_logger) + yield structured_logger + + +def test_missing_csv_library(tmp_path, structured_logger_module): + with pytest.raises(RuntimeError, match="Can't parse CSV files. Missing library"): + structured_logger_module.CSVStrategy() + + +def test_missing_yaml_library(tmp_path, structured_logger_module): + with pytest.raises(RuntimeError, match="Can't parse YAML files. Missing library"): + structured_logger_module.YAMLStrategy() + + +def test_autosavedict_setitem(): + save_callback = MagicMock() + d = AutoSaveDict(save_callback=save_callback) + d["key"] = "value" + assert d["key"] == "value" + save_callback.assert_called_once() + + +def test_autosavedict_delitem(): + save_callback = MagicMock() + d = AutoSaveDict({"key": "value"}, save_callback=save_callback) + del d["key"] + assert "key" not in d + save_callback.assert_called_once() + + +def test_autosavedict_pop(): + save_callback = MagicMock() + d = AutoSaveDict({"key": "value"}, save_callback=save_callback) + result = d.pop("key") + assert result == "value" + assert "key" not in d + save_callback.assert_called_once() + + +def test_autosavedict_update(): + save_callback = MagicMock() + d = AutoSaveDict({"key": "old_value"}, save_callback=save_callback) + d.update({"key": "new_value"}) + assert d["key"] == "new_value" + save_callback.assert_called_once() + + +def test_structured_logger_setitem(tmp_file): + logger = StructuredLogger(tmp_file, JSONStrategy()) + logger.data["field"] = "value" + + with open(tmp_file, "r") as f: + data = json.load(f) + + assert data["field"] == "value" + + +def test_structured_logger_set_recursive(tmp_file): + logger = StructuredLogger(tmp_file, JSONStrategy()) + logger.data["field"] = {"test": True} + other = logger.data["field"] + other["late"] = True + + with open(tmp_file, "r") as f: + data = json.load(f) + + assert data["field"]["test"] + assert data["field"]["late"] + + +def test_structured_logger_set_list(tmp_file): + logger = StructuredLogger(tmp_file, JSONStrategy()) + logger.data["field"] = [True] + other = logger.data["field"] + other.append(True) + + with open(tmp_file, "r") as f: + data = json.load(f) + + assert data["field"][0] + assert data["field"][1] + + +def test_structured_logger_delitem(tmp_file): + logger = StructuredLogger(tmp_file, JSONStrategy()) + logger.data["field"] = "value" + del logger.data["field"] + + with open(tmp_file, "r") as f: + data = json.load(f) + + assert "field" not in data + + +def test_structured_logger_pop(tmp_file): + logger = StructuredLogger(tmp_file, JSONStrategy()) + logger.data["field"] = "value" + logger.data.pop("field") + + with open(tmp_file, "r") as f: + data = json.load(f) + + assert "field" not in data + + +def test_structured_logger_update(tmp_file): + logger = StructuredLogger(tmp_file, JSONStrategy()) + logger.data.update({"field": "value"}) + + with open(tmp_file, "r") as f: + data = json.load(f) + + assert data["field"] == "value"