From c31395d6a640c1a254e04196c60672e17c29c43e Mon Sep 17 00:00:00 2001 From: Harsh Vardhan Date: Sun, 11 Feb 2024 23:20:55 +0530 Subject: [PATCH 1/3] Created FileReporter and Parser --- src/nnbench/reporter/file.py | 153 +++++++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 src/nnbench/reporter/file.py diff --git a/src/nnbench/reporter/file.py b/src/nnbench/reporter/file.py new file mode 100644 index 00000000..70abf0ca --- /dev/null +++ b/src/nnbench/reporter/file.py @@ -0,0 +1,153 @@ +from __future__ import annotations + +import os + +from nnbench.reporter.base import BenchmarkReporter +from nnbench.types import BenchmarkRecord + +from abc import ABC, abstractmethod + + +class Parser(ABC): + """Abstract base class for parsing records form file. + + Usage: + ------ + ``` + class MyCustomParser(Parser): + def parse(self, records): + # Implement your custom parsing logic here + ... + # Register your custom parser with a distinct file type + MyCustomParser.register("my_custom_format") + # Usage: + custom_records = ... # Load records in your custom format + append_record_to_records(records, custom_record, "my_custom_format") + ``` + """ + + @abstractmethod + def parse(self, records): + """Parses records and returns a list of parsed data. + + Args: + records: A list or iterator of record strings. + + Returns: + A list of parsed records. + """ + + @classmethod + def register(cls, file_type): + """Registers a parser for a specific file type. + + Args: + file_type: The file type (string) + """ + parsers[file_type] = cls + + @staticmethod + def get_parser(file_type): + """Gets the registered parser for a file type. + + Args: + file_type: The file type (string) + + Returns: + The registered RecordParser, or None if not found. + """ + + +class JsonParser(Parser): + def parse(self, records): + import json + return [json.loads(record) for record in records] + + +class YamlParser(Parser): + def parse(self, records): + import yaml + return [yaml.safe_load(record) for record in records] + + +parsers = {"json": JsonParser, "yaml": YamlParser} + + +def parse_records(records, file_type): + """Parses records based on the specified file type. + + This function retrieves and calls the registered parser for + the given file type. + + Args: + records: A list or iterator of record strings. + file_type: The file type (string). + + Returns: + A list of parsed records. + """ + + parser = Parser.get_parser(file_type) + if parser is None: + raise ValueError(f"Unsupported file type: {file_type}") + + return parser().parse(records) + + +def append_record_to_records(records, record, file_type): + """Appends a record to the list based on the file type. + + This function first parses the record using the appropriate parser + and then appends it to the `records` list. + + Args: + records: A list of parsed records. + record: The record string to append. + file_type: The file type (string). + """ + + parsed_record = parse_records([record], file_type)[0] + records.append(parsed_record) + + +class FileReporter(BenchmarkReporter): + def __init__(self, dir: str): + if not os.path.exists(dir): + self.initialize(dir) + self.dir = dir + + def initialize(self, path: str) -> None: + try: + os.makedirs(path, exist_ok=True) + except OSError as e: + raise ValueError(f"Could not create directory: {path}") from e + + def read(self, file_name: str) -> BenchmarkRecord: + if not self.dir: + raise (f"Directory is not initialized") + file_path = os.path.join(self.dir, file_name) + file_type = file_name.split('.')[1] + try: + with open(file_path) as file: + data = file.read() + parsed_data = parse_records(data, file_type) + return parsed_data + except: + raise ValueError(f"Could not read the file: {file_path}") + + def write(self, file_name, record: BenchmarkRecord) -> None: + if not self.dir: + raise (f"Directory is not initialized") + + file_path = os.path.join(self.dir, file_name) + try: + records = self.read(file_name) + file_type = file_name.split('.')[1] + new_records = append_record_to_records(records, record, file_type) + with open(file_path, 'w') as file: + file.write(new_records) + except: + raise ValueError(f"Could not read the file: {file_path}") + + def finalize(self) -> None: + del self.dir From f8c8371b46b8f80df0ab4bb73257dc9ae3d898f3 Mon Sep 17 00:00:00 2001 From: Harsh Vardhan Date: Mon, 12 Feb 2024 19:57:58 +0530 Subject: [PATCH 2/3] Implemented FileReporter and Parser --- src/nnbench/reporter/__init__.py | 2 + src/nnbench/reporter/file.py | 146 +++++++++++++++++++++---------- 2 files changed, 101 insertions(+), 47 deletions(-) diff --git a/src/nnbench/reporter/__init__.py b/src/nnbench/reporter/__init__.py index 79196fc4..29bcf594 100644 --- a/src/nnbench/reporter/__init__.py +++ b/src/nnbench/reporter/__init__.py @@ -8,10 +8,12 @@ from .base import BenchmarkReporter from .console import ConsoleReporter +from .file import FileReporter # internal, mutable _reporter_registry: dict[str, type[BenchmarkReporter]] = { "console": ConsoleReporter, + "file": FileReporter, } # external, immutable diff --git a/src/nnbench/reporter/file.py b/src/nnbench/reporter/file.py index 70abf0ca..5265f32c 100644 --- a/src/nnbench/reporter/file.py +++ b/src/nnbench/reporter/file.py @@ -1,87 +1,132 @@ from __future__ import annotations import os +from typing import Any, List from nnbench.reporter.base import BenchmarkReporter from nnbench.types import BenchmarkRecord -from abc import ABC, abstractmethod +class Parser: + """The base interface for parsing records form file. -class Parser(ABC): - """Abstract base class for parsing records form file. - Usage: ------ ``` class MyCustomParser(Parser): - def parse(self, records): + def parse_file(self, records): # Implement your custom parsing logic here ... + def write_records(self, records): + # Implement your custom file writing logic here + ... # Register your custom parser with a distinct file type MyCustomParser.register("my_custom_format") # Usage: - custom_records = ... # Load records in your custom format - append_record_to_records(records, custom_record, "my_custom_format") + new_record = ... # Load records in your custom format + append_record_to_records(records, new_record, "my_custom_format") ``` """ - @abstractmethod - def parse(self, records): + def parse_file(self, records: str) -> Any: """Parses records and returns a list of parsed data. Args: - records: A list or iterator of record strings. + ----- + `records:` A list or iterator of record strings. Returns: + -------- A list of parsed records. """ + raise NotImplementedError + + def write_records(self, records: Any[BenchmarkRecord], record: BenchmarkRecord) -> str: + """Appends a record to the existing records based on the file type. + + Args: + ----- + `records:` A list of parsed records. + `record:` The record string to append. + `file_type:` The file type (string). + + Returns: + -------- + A string form of the content to be written in a file. + """ + raise NotImplementedError @classmethod - def register(cls, file_type): + def register(cls, file_type: str) -> None: """Registers a parser for a specific file type. Args: - file_type: The file type (string) + `file_type:` The file type (string) """ parsers[file_type] = cls @staticmethod - def get_parser(file_type): + def get_parser(file_type: str): """Gets the registered parser for a file type. Args: - file_type: The file type (string) + `file_type:` The file type (string) Returns: + -------- The registered RecordParser, or None if not found. """ + return parsers.get(file_type) class JsonParser(Parser): - def parse(self, records): + def parse_file(self, records: str) -> List[dict]: import json - return [json.loads(record) for record in records] + + try: + return json.loads(records if records else "[]") + except json.JSONDecodeError: + raise ValueError("Unexpected records passed") + + def write_records( + self, parsed_records: Any[BenchmarkRecord] | None, record: BenchmarkRecord + ) -> str: + import json + + parsed_records.append(record) + return json.dumps(parsed_records) class YamlParser(Parser): - def parse(self, records): + def parse_file(self, records: str) -> List[dict]: import yaml - return [yaml.safe_load(record) for record in records] + return yaml.safe_load(records) if records else [] + def write_records( + self, parsed_records: Any[BenchmarkRecord] | None, record: BenchmarkRecord + ) -> str: + import yaml + + parsed_records.append(record) + for element in record["benchmarks"]: + element["value"] = float(element["value"]) + return yaml.dump(parsed_records) + + +# Register custom parsers here parsers = {"json": JsonParser, "yaml": YamlParser} -def parse_records(records, file_type): +def parse_records(records: str, file_type: str) -> Any: """Parses records based on the specified file type. This function retrieves and calls the registered parser for the given file type. Args: - records: A list or iterator of record strings. - file_type: The file type (string). + `records:` A list or iterator of record strings. + `file_type:` The file type (string). Returns: A list of parsed records. @@ -91,62 +136,69 @@ def parse_records(records, file_type): if parser is None: raise ValueError(f"Unsupported file type: {file_type}") - return parser().parse(records) + return parser().parse_file(records) -def append_record_to_records(records, record, file_type): +def append_record_to_records(parsed_records: Any, record: BenchmarkRecord, file_type: str) -> str: """Appends a record to the list based on the file type. This function first parses the record using the appropriate parser - and then appends it to the `records` list. + and then appends it to the `parsed_records`. Args: - records: A list of parsed records. - record: The record string to append. - file_type: The file type (string). + `records:` A list of parsed records. + `record:` The record to append. + `file_type:` The file type (string). """ - parsed_record = parse_records([record], file_type)[0] - records.append(parsed_record) + parser = Parser.get_parser(file_type) + if parser is None: + raise ValueError(f"Unsupported file type: {file_type}") + + return parser().write_records(parsed_records, record) class FileReporter(BenchmarkReporter): def __init__(self, dir: str): - if not os.path.exists(dir): - self.initialize(dir) self.dir = dir + if not os.path.exists(dir): + self.initialize() - def initialize(self, path: str) -> None: + def initialize(self) -> None: try: - os.makedirs(path, exist_ok=True) + os.makedirs(self.dir, exist_ok=True) except OSError as e: - raise ValueError(f"Could not create directory: {path}") from e - + self.finalize() + raise ValueError(f"Could not create directory: {self.dir}") from e + def read(self, file_name: str) -> BenchmarkRecord: if not self.dir: - raise (f"Directory is not initialized") + raise BaseException("Directory is not initialized") file_path = os.path.join(self.dir, file_name) - file_type = file_name.split('.')[1] + file_type = file_name.split(".")[1] try: with open(file_path) as file: data = file.read() parsed_data = parse_records(data, file_type) return parsed_data - except: + except FileNotFoundError: raise ValueError(f"Could not read the file: {file_path}") - - def write(self, file_name, record: BenchmarkRecord) -> None: + + def write(self, record: BenchmarkRecord, file_name: str) -> None: if not self.dir: - raise (f"Directory is not initialized") - + raise BaseException("Directory is not initialized") + file_path = os.path.join(self.dir, file_name) + if not os.path.exists(file_path): # Create the file + with open(file_path, "w") as file: + file.write("") try: - records = self.read(file_name) - file_type = file_name.split('.')[1] - new_records = append_record_to_records(records, record, file_type) - with open(file_path, 'w') as file: + parsed_records = self.read(file_name) + file_type = file_name.split(".")[1] + new_records = append_record_to_records(parsed_records, record, file_type) + with open(file_path, "w") as file: file.write(new_records) - except: + except FileNotFoundError: raise ValueError(f"Could not read the file: {file_path}") def finalize(self) -> None: From 3c3b850522e32eb1454a660c5af859f1c3f195e1 Mon Sep 17 00:00:00 2001 From: Harsh Vardhan Date: Tue, 13 Feb 2024 22:25:07 +0530 Subject: [PATCH 3/3] Changed the file loaders value structure in `reporter/file.py` and added changes to read and write methods of base `BenchmarkReporter`. --- examples/prefect/runner.py | 4 +- src/nnbench/reporter/base.py | 6 +- src/nnbench/reporter/file.py | 263 +++++++++++++---------------------- 3 files changed, 104 insertions(+), 169 deletions(-) diff --git a/examples/prefect/runner.py b/examples/prefect/runner.py index df351487..baf2808c 100644 --- a/examples/prefect/runner.py +++ b/examples/prefect/runner.py @@ -1,3 +1,5 @@ +from typing import Any + import numpy as np import training from prefect import flow, get_run_logger, task @@ -11,7 +13,7 @@ class PrefectReporter(reporter.BenchmarkReporter): def __init__(self): self.logger = get_run_logger() - def write(self, record: types.BenchmarkRecord) -> None: + def write(self, record: types.BenchmarkRecord, **kwargs: Any) -> None: self.logger.info(record) diff --git a/src/nnbench/reporter/base.py b/src/nnbench/reporter/base.py index 6bb97fc7..dc328392 100644 --- a/src/nnbench/reporter/base.py +++ b/src/nnbench/reporter/base.py @@ -1,7 +1,7 @@ from __future__ import annotations import re -from typing import Any, Callable, Sequence +from typing import Any, Callable, List, Sequence from tabulate import tabulate @@ -88,13 +88,13 @@ def display( print(tabulate(filtered, headers="keys", tablefmt=self.tablefmt)) - def read(self) -> BenchmarkRecord: + def read(self, **kwargs: Any) -> BenchmarkRecord | List[BenchmarkRecord]: raise NotImplementedError def read_batched(self) -> list[BenchmarkRecord]: raise NotImplementedError - def write(self, record: BenchmarkRecord) -> None: + def write(self, record: BenchmarkRecord, **kwargs: Any) -> None: raise NotImplementedError def write_batched(self, records: Sequence[BenchmarkRecord]) -> None: diff --git a/src/nnbench/reporter/file.py b/src/nnbench/reporter/file.py index 5265f32c..152d1f52 100644 --- a/src/nnbench/reporter/file.py +++ b/src/nnbench/reporter/file.py @@ -1,205 +1,138 @@ from __future__ import annotations +import json import os -from typing import Any, List +from typing import IO, Any, Callable, List from nnbench.reporter.base import BenchmarkReporter from nnbench.types import BenchmarkRecord +ser = Callable[[IO, List[BenchmarkRecord], Any], None] +de = Callable[[IO, dict[str, Any]], List[BenchmarkRecord]] -class Parser: - """The base interface for parsing records form file. - - Usage: - ------ - ``` - class MyCustomParser(Parser): - def parse_file(self, records): - # Implement your custom parsing logic here - ... - def write_records(self, records): - # Implement your custom file writing logic here - ... - # Register your custom parser with a distinct file type - MyCustomParser.register("my_custom_format") - # Usage: - new_record = ... # Load records in your custom format - append_record_to_records(records, new_record, "my_custom_format") - ``` - """ - - def parse_file(self, records: str) -> Any: - """Parses records and returns a list of parsed data. - - Args: - ----- - `records:` A list or iterator of record strings. - - Returns: - -------- - A list of parsed records. - """ - raise NotImplementedError - - def write_records(self, records: Any[BenchmarkRecord], record: BenchmarkRecord) -> str: - """Appends a record to the existing records based on the file type. - - Args: - ----- - `records:` A list of parsed records. - `record:` The record string to append. - `file_type:` The file type (string). - - Returns: - -------- - A string form of the content to be written in a file. - """ - raise NotImplementedError - - @classmethod - def register(cls, file_type: str) -> None: - """Registers a parser for a specific file type. +# A registry of supported file loaders +_file_loaders: dict[str, tuple[ser, de]] = {} - Args: - `file_type:` The file type (string) - """ - parsers[file_type] = cls - @staticmethod - def get_parser(file_type: str): - """Gets the registered parser for a file type. - - Args: - `file_type:` The file type (string) - - Returns: - -------- - The registered RecordParser, or None if not found. - """ - return parsers.get(file_type) +# Register file loaders +def register_file_io(serializer: Callable, deserializer: Callable, file_type: str) -> None: + """ + Registers a serializer and deserializer for a file type. + Args: + ----- + `serializer (Callable):` Defines how records are written to a file. + `deserializer (Callable):` Defines how file contents are converted to `BenchmarkRecord`. + `file_type (str):` File type extension (e.g., ".json", ".yaml"). + """ + _file_loaders[file_type] = (serializer, deserializer) -class JsonParser(Parser): - def parse_file(self, records: str) -> List[dict]: - import json - try: - return json.loads(records if records else "[]") - except json.JSONDecodeError: - raise ValueError("Unexpected records passed") +def _get_file_loader(file_type: str) -> tuple[ser, de]: + """Helps retrieve registered file loaders of the given file_type with error handling""" + file_loaders = _file_loaders.get(file_type) + if not file_loaders: + raise ValueError(f"File loaders for `{file_type}` files does not exist") + return file_loaders - def write_records( - self, parsed_records: Any[BenchmarkRecord] | None, record: BenchmarkRecord - ) -> str: - import json - parsed_records.append(record) - return json.dumps(parsed_records) +# json file loader: +def json_load(fp: IO, options: Any = None) -> List[BenchmarkRecord] | None: + file_content = fp.read() + if file_content: + objs = [ + BenchmarkRecord(context=obj["context"], benchmarks=obj["benchmarks"]) + for obj in json.loads(file_content) + ] + return objs + return None -class YamlParser(Parser): - def parse_file(self, records: str) -> List[dict]: - import yaml +def json_save(fp: IO, records: List[BenchmarkRecord], options: Any = None) -> None: + fp.write(json.dumps(records)) - return yaml.safe_load(records) if records else [] - def write_records( - self, parsed_records: Any[BenchmarkRecord] | None, record: BenchmarkRecord - ) -> str: +# yaml file loader: +def yaml_load(fp: IO, options: Any = None) -> List[BenchmarkRecord] | None: + try: import yaml + except ImportError: + raise ModuleNotFoundError("`pyyaml` is not installed") - parsed_records.append(record) - for element in record["benchmarks"]: - element["value"] = float(element["value"]) - return yaml.dump(parsed_records) + file_content = fp.read() + if file_content: + objs = [ + BenchmarkRecord(context=obj["context"], benchmarks=obj["benchmarks"]) + for obj in yaml.safe_load(file_content) + ] + return objs + return None -# Register custom parsers here -parsers = {"json": JsonParser, "yaml": YamlParser} +def yaml_save(fp: IO, records: List[BenchmarkRecord], options: dict[str, Any] = None) -> None: + try: + import yaml + except ImportError: + raise ModuleNotFoundError("`pyyaml` is not installed") + # To avoid `yaml.safe_dump()` error when trying to write numpy array + for element in records[-1]["benchmarks"]: + element["value"] = float(element["value"]) + yaml.safe_dump(records, fp, **(options or {})) -def parse_records(records: str, file_type: str) -> Any: - """Parses records based on the specified file type. - This function retrieves and calls the registered parser for - the given file type. +# Register json and yaml file loaders +register_file_io(json_save, json_load, file_type="json") +register_file_io(yaml_save, yaml_load, file_type="yaml") - Args: - `records:` A list or iterator of record strings. - `file_type:` The file type (string). - Returns: - A list of parsed records. +class FileReporter(BenchmarkReporter): """ + Reports benchmark results to files in a given directory. - parser = Parser.get_parser(file_type) - if parser is None: - raise ValueError(f"Unsupported file type: {file_type}") - - return parser().parse_file(records) - - -def append_record_to_records(parsed_records: Any, record: BenchmarkRecord, file_type: str) -> str: - """Appends a record to the list based on the file type. - - This function first parses the record using the appropriate parser - and then appends it to the `parsed_records`. + This class implements a `BenchmarkReporter` subclass that persists benchmark + records to files within a specified directory. It supports both reading and + writing records, using file extensions to automatically determine the appropriate + serialization format. Args: - `records:` A list of parsed records. - `record:` The record to append. - `file_type:` The file type (string). - """ - - parser = Parser.get_parser(file_type) - if parser is None: - raise ValueError(f"Unsupported file type: {file_type}") - - return parser().write_records(parsed_records, record) + ----- + directory (str): The directory where benchmark files will be stored. + Raises: + ------- + BaseException: If the directory is not initialized. + """ -class FileReporter(BenchmarkReporter): - def __init__(self, dir: str): - self.dir = dir - if not os.path.exists(dir): + def __init__(self, directory: str): + self.directory = directory + if not os.path.exists(directory): self.initialize() def initialize(self) -> None: - try: - os.makedirs(self.dir, exist_ok=True) - except OSError as e: - self.finalize() - raise ValueError(f"Could not create directory: {self.dir}") from e - - def read(self, file_name: str) -> BenchmarkRecord: - if not self.dir: - raise BaseException("Directory is not initialized") - file_path = os.path.join(self.dir, file_name) + os.makedirs(self.directory, exist_ok=True) + + def read(self, **kwargs: Any) -> List[BenchmarkRecord]: + if not self.directory: + raise BaseException("No directory is initialized") + file_name = str(kwargs["file_name"]) + file_path = os.path.join(self.directory, file_name) file_type = file_name.split(".")[1] - try: - with open(file_path) as file: - data = file.read() - parsed_data = parse_records(data, file_type) - return parsed_data - except FileNotFoundError: - raise ValueError(f"Could not read the file: {file_path}") - - def write(self, record: BenchmarkRecord, file_name: str) -> None: - if not self.dir: - raise BaseException("Directory is not initialized") - - file_path = os.path.join(self.dir, file_name) - if not os.path.exists(file_path): # Create the file + with open(file_path) as file: + return _get_file_loader(file_type)[1](file, {}) + + def write(self, record: BenchmarkRecord, **kwargs: dict[str, Any]) -> None: + if not self.directory: + raise BaseException("No directory is initialized") + file_name = str(kwargs["file_name"]) + file_path = os.path.join(self.directory, file_name) + # Create the file, if not already existing + if not os.path.exists(file_path): with open(file_path, "w") as file: file.write("") - try: - parsed_records = self.read(file_name) - file_type = file_name.split(".")[1] - new_records = append_record_to_records(parsed_records, record, file_type) - with open(file_path, "w") as file: - file.write(new_records) - except FileNotFoundError: - raise ValueError(f"Could not read the file: {file_path}") - - def finalize(self) -> None: - del self.dir + prev_records = self.read(file_name=file_name) + prev_records = prev_records if prev_records else [] + prev_records.append(record) # + file_type = file_name.split(".")[1] + with open(file_path, "w") as file: + _get_file_loader(file_type)[0](file, prev_records, {})