diff --git a/tests/commands/run_command/_utils.py b/tests/commands/run_command/_utils.py index 556229bb..de364fa2 100644 --- a/tests/commands/run_command/_utils.py +++ b/tests/commands/run_command/_utils.py @@ -6,7 +6,7 @@ from vedro.commands import CommandArgumentParser -__all__ = ("tmp_dir", "create_scenario", "arg_parser",) +__all__ = ("tmp_dir", "create_scenario", "arg_parser", "ArgumentParser",) @pytest.fixture() diff --git a/tests/commands/run_command/test_config_validator.py b/tests/commands/run_command/test_config_validator.py new file mode 100644 index 00000000..cdfebda2 --- /dev/null +++ b/tests/commands/run_command/test_config_validator.py @@ -0,0 +1,100 @@ +from pathlib import Path + +import pytest +from baby_steps import given, then, when +from pytest import raises + +from vedro.commands.run_command._config_validator import ConfigValidator +from vedro.core import Config + +from ._utils import tmp_dir + +__all__ = ("tmp_dir",) # fixtures + + +def test_validate(): + with given: + class CustomScenarioDir(Config): + pass + + validator = ConfigValidator(CustomScenarioDir) + + with when: + res = validator.validate() + + with then: + assert res is None + + +def test_validate_with_invalid_scenario_dir(): + with given: + class CustomScenarioDir(Config): + default_scenarios_dir = None + + validator = ConfigValidator(CustomScenarioDir) + + with when, raises(BaseException) as exc: + validator.validate() + + with then: + assert exc.type is TypeError + assert str(exc.value) == ( + "Expected `default_scenarios_dir` to be a Path or str, got (None)" + ) + + +@pytest.mark.usefixtures(tmp_dir.__name__) +def test_validate_with_nonexistent_scenario_dir(): + with given: + class CustomScenarioDir(Config): + default_scenarios_dir = "nonexisting/" + + validator = ConfigValidator(CustomScenarioDir) + + with when, raises(BaseException) as exc: + validator.validate() + + with then: + assert exc.type is FileNotFoundError + + scenarios_dir = Path(CustomScenarioDir.default_scenarios_dir).resolve() + assert str(exc.value) == f"`default_scenarios_dir` ('{scenarios_dir}') does not exist" + + +def test_validate_with_non_directory_scenario_dir(tmp_dir: Path): + with given: + existing_file = tmp_dir / "scenario.py" + existing_file.touch() + + class CustomScenarioDir(Config): + default_scenarios_dir = existing_file + + validator = ConfigValidator(CustomScenarioDir) + + with when, raises(BaseException) as exc: + validator.validate() + + with then: + assert exc.type is NotADirectoryError + assert str(exc.value) == f"`default_scenarios_dir` ('{existing_file}') is not a directory" + + +@pytest.mark.usefixtures(tmp_dir.__name__) +def test_validate_with_scenario_dir_outside_project_dir(): + with given: + class CustomScenarioDir(Config): + default_scenarios_dir = "/tmp" + + validator = ConfigValidator(CustomScenarioDir) + + with when, raises(BaseException) as exc: + validator.validate() + + with then: + assert exc.type is ValueError + + scenario_dir = Path(CustomScenarioDir.default_scenarios_dir).resolve() + assert str(exc.value) == ( + f"`default_scenarios_dir` ('{scenario_dir}') must be inside the project directory " + f"('{Config.project_dir}')" + ) diff --git a/tests/commands/run_command/test_plugin_config_validator.py b/tests/commands/run_command/test_plugin_config_validator.py new file mode 100644 index 00000000..66523b7d --- /dev/null +++ b/tests/commands/run_command/test_plugin_config_validator.py @@ -0,0 +1,143 @@ +from os import linesep + +from baby_steps import given, then, when +from pytest import raises + +from vedro.commands.run_command._plugin_config_validator import PluginConfigValidator +from vedro.core import Plugin, PluginConfig + +from ._utils import tmp_dir + +__all__ = ("tmp_dir",) # fixtures + + +class CustomPlugin(Plugin): + pass + + +class CustomPluginConfig(PluginConfig): + plugin = CustomPlugin + + +def test_validate(): + with given: + class CustomPluginConfigWithDependency(PluginConfig): + plugin = CustomPlugin + depends_on = [CustomPluginConfig] + + validator = PluginConfigValidator() + + with when: + res = validator.validate(CustomPluginConfigWithDependency) + + with then: + assert res is None + + +def test_validate_not_subclass(): + with given: + validator = PluginConfigValidator() + + with when, raises(BaseException) as exc: + validator.validate(object) + + with then: + assert exc.type is TypeError + assert str(exc.value) == ( + "PluginConfig '' must be a subclass of 'vedro.core.PluginConfig'" + ) + + +def test_validate_not_subclass_plugin(): + with given: + class InvalidPluginConfig(PluginConfig): + plugin = object + + validator = PluginConfigValidator() + + with when, raises(BaseException) as exc: + validator.validate(InvalidPluginConfig) + + with then: + assert exc.type is TypeError + assert str(exc.value) == ( + "Attribute 'plugin' in 'InvalidPluginConfig' must be a subclass of 'vedro.core.Plugin'" + ) + + +def test_validate_depends_on_not_sequence(): + with given: + class InvalidPluginConfig(PluginConfig): + plugin = CustomPlugin + depends_on = object() + + validator = PluginConfigValidator() + + with when, raises(BaseException) as exc: + validator.validate(InvalidPluginConfig) + + with then: + assert exc.type is TypeError + assert str(exc.value) == ( + "Attribute 'depends_on' in 'InvalidPluginConfig' plugin must be a list or " + "another sequence type ( provided). " + + linesep.join([ + "Example:", + " @computed", + " def depends_on(cls):", + " return [Config.Plugins.Tagger]" + ]) + ) + + +def test_validate_depends_on_not_subclass(): + with given: + class InvalidPluginConfig(PluginConfig): + plugin = CustomPlugin + depends_on = [object] + + validator = PluginConfigValidator() + + with when, raises(BaseException) as exc: + validator.validate(InvalidPluginConfig) + + with then: + assert exc.type is TypeError + assert str(exc.value) == ( + "Dependency '' in 'depends_on' of 'InvalidPluginConfig' " + "must be a subclass of 'vedro.core.PluginConfig'" + ) + + +def test_validate_unknown_attributes(): + with given: + class InvalidPluginConfig(PluginConfig): + plugin = CustomPlugin + unknown = "unknown" + + validator = PluginConfigValidator() + + with when, raises(BaseException) as exc: + validator.validate(InvalidPluginConfig) + + with then: + assert exc.type is AttributeError + assert str(exc.value) == ( + "InvalidPluginConfig configuration contains unknown attributes: unknown" + ) + + +def test_validate_unknown_attributes_disabled(): + with given: + class InvalidPluginConfig(PluginConfig): + plugin = CustomPlugin + unknown = "unknown" + + validator = PluginConfigValidator(validate_plugins_attrs=False) + + with when: + validator.validate(InvalidPluginConfig) + + with then: + # no exception raised + pass diff --git a/tests/commands/run_command/test_plugin_registrar.py b/tests/commands/run_command/test_plugin_registrar.py new file mode 100644 index 00000000..e6830950 --- /dev/null +++ b/tests/commands/run_command/test_plugin_registrar.py @@ -0,0 +1,211 @@ +from os import linesep +from unittest.mock import MagicMock + +import pytest +from baby_steps import given, then, when +from pytest import raises + +from vedro import computed +from vedro.commands.run_command._plugin_registrar import PluginRegistrar +from vedro.core import Dispatcher, Plugin, PluginConfig + + +@pytest.fixture() +def registrar() -> PluginRegistrar: + return PluginRegistrar() + + +@pytest.fixture() +def dispatcher_() -> Dispatcher: + return MagicMock(spec=Dispatcher) + + +class CustomPlugin(Plugin): + pass + + +class CustomPluginConfig(PluginConfig): + plugin = CustomPlugin + + +def test_register_no_plugins(*, registrar: PluginRegistrar, dispatcher_: Dispatcher): + with given: + plugins = [] + + with when: + res = registrar.register(plugins, dispatcher_) + + with then: + assert res is None + assert dispatcher_.register.mock_calls == [] + + +def test_register_plugins_no_deps(*, registrar: PluginRegistrar, dispatcher_: Dispatcher): + with given: + class AnotherPluginConfig(PluginConfig): + plugin = CustomPlugin + + plugins = [AnotherPluginConfig, CustomPluginConfig] + + with when: + registrar.register(plugins, dispatcher_) + + with then: + assert len(dispatcher_.register.mock_calls) == 2 + + registered_plugin1 = dispatcher_.register.mock_calls[0].args[0] + assert isinstance(registered_plugin1, CustomPlugin) + assert registered_plugin1.config is AnotherPluginConfig + + registered_plugin2 = dispatcher_.register.mock_calls[1].args[0] + assert isinstance(registered_plugin2, CustomPlugin) + assert registered_plugin2.config is CustomPluginConfig + + +def test_register_plugins_skip_disabled(*, registrar: PluginRegistrar, dispatcher_: Dispatcher): + with given: + class AnotherPluginConfig(PluginConfig): + plugin = CustomPlugin + enabled = False + + plugins = [CustomPluginConfig, AnotherPluginConfig] + + with when: + registrar.register(plugins, dispatcher_) + + with then: + assert len(dispatcher_.register.mock_calls) == 1 + + registered_plugin = dispatcher_.register.mock_calls[0].args[0] + assert isinstance(registered_plugin, CustomPlugin) + assert registered_plugin.config is CustomPluginConfig + + +def test_register_plugins_with_deps(*, registrar: PluginRegistrar, dispatcher_: Dispatcher): + with given: + class AnotherPluginConfig(PluginConfig): + plugin = CustomPlugin + depends_on = [CustomPluginConfig] + + plugins = [AnotherPluginConfig, CustomPluginConfig] + + with when: + registrar.register(plugins, dispatcher_) + + with then: + assert len(dispatcher_.register.mock_calls) == 2 + + registered_plugin1 = dispatcher_.register.mock_calls[0].args[0] + assert isinstance(registered_plugin1, CustomPlugin) + assert registered_plugin1.config is CustomPluginConfig + + registered_plugin2 = dispatcher_.register.mock_calls[1].args[0] + assert isinstance(registered_plugin2, CustomPlugin) + assert registered_plugin2.config is AnotherPluginConfig + + +def test_register_plugins_with_deps_with_unknown_plugin(*, registrar: PluginRegistrar, + dispatcher_: Dispatcher): + with given: + class AnotherPluginConfig(PluginConfig): + plugin = CustomPlugin + depends_on = [CustomPluginConfig] + + plugins = [AnotherPluginConfig] + + with when, raises(BaseException) as exc: + registrar.register(plugins, dispatcher_) + + with then: + assert exc.type is ValueError + assert str(exc.value) == ( + "Plugin 'AnotherPluginConfig' depends on unknown plugin 'CustomPluginConfig'" + ) + + +def test_register_plugins_with_deps_with_disabled_plugin(*, registrar: PluginRegistrar, + dispatcher_: Dispatcher): + with given: + class DisabledPluginConfig(PluginConfig): + plugin = CustomPlugin + enabled = False + + class AnotherPluginConfig(PluginConfig): + plugin = CustomPlugin + depends_on = [DisabledPluginConfig] + + plugins = [DisabledPluginConfig, AnotherPluginConfig] + + with when, raises(BaseException) as exc: + registrar.register(plugins, dispatcher_) + + with then: + assert exc.type is ValueError + assert str(exc.value) == ( + "Plugin 'AnotherPluginConfig' depends on disabled plugin 'DisabledPluginConfig'" + ) + + +def test_register_plugins_with_deps_cycle(*, registrar: PluginRegistrar, dispatcher_: Dispatcher): + with given: + class AnotherPluginConfig(PluginConfig): + plugin = CustomPlugin + depends_on = computed(lambda _: [CustomPluginConfig, CyclePluginConfig]) + + class CyclePluginConfig(PluginConfig): + plugin = CustomPlugin + depends_on = [AnotherPluginConfig] + + plugins = [CustomPluginConfig, AnotherPluginConfig, CyclePluginConfig] + + with when, raises(BaseException) as exc: + registrar.register(plugins, dispatcher_) + + with then: + assert exc.type is RuntimeError + assert str(exc.value) == ( + "A cyclic dependency between plugins has been detected. " + f"Any of the following plugins could be referencing each other in a cycle:{linesep}" + f" - AnotherPluginConfig{linesep}" + f" - CyclePluginConfig{linesep}" + "Please review their 'depends_on' references to resolve this issue." + ) + + assert dispatcher_.register.mock_calls == [] + + +def test_register_plugins_complex_dependency_graph(*, registrar: PluginRegistrar, + dispatcher_: Dispatcher): + with given: + class A(PluginConfig): + plugin = CustomPlugin + depends_on = computed(lambda _: [B, C]) + + class B(PluginConfig): + plugin = CustomPlugin + depends_on = computed(lambda _: [D]) + + class C(PluginConfig): + plugin = CustomPlugin + depends_on = computed(lambda _: []) + + class D(PluginConfig): + plugin = CustomPlugin + depends_on = computed(lambda _: [E]) + + class E(PluginConfig): + plugin = CustomPlugin + depends_on = computed(lambda _: []) + + plugins = [A, B, C, D, E] + + with when: + registrar.register(plugins, dispatcher_) + + with then: + assert len(dispatcher_.register.mock_calls) == 5 + + for plugin_config in [C, E, D, B, A]: + registered_plugin = dispatcher_.register.mock_calls.pop(0).args[0] + assert isinstance(registered_plugin, CustomPlugin) + assert registered_plugin.config is plugin_config diff --git a/tests/commands/run_command/test_run_command.py b/tests/commands/run_command/test_run_command.py index c359ebdf..9e8b953a 100644 --- a/tests/commands/run_command/test_run_command.py +++ b/tests/commands/run_command/test_run_command.py @@ -24,71 +24,7 @@ class Terminator(vedro.Config.Plugins.Terminator): pass -async def test_run_command_with_invalid_type_scenario_dir(arg_parser: ArgumentParser): - with given: - class CustomScenarioDir(CustomConfig): - default_scenarios_dir = None - command = RunCommand(CustomScenarioDir, arg_parser) - - with when, raises(BaseException) as exc: - await command.run() - - with then: - assert exc.type is TypeError - assert "default_scenarios_dir" in str(exc.value) - assert "to be a Path" in str(exc.value) - - -async def test_run_command_with_nonexistent_scenario_dir(arg_parser: ArgumentParser): - with given: - class CustomScenarioDir(CustomConfig): - default_scenarios_dir = "nonexisting/" - command = RunCommand(CustomScenarioDir, arg_parser) - - with when, raises(BaseException) as exc: - await command.run() - - with then: - assert exc.type is FileNotFoundError - assert "default_scenarios_dir" in str(exc.value) - assert "does not exist" in str(exc.value) - - -async def test_run_command_with_non_directory_scenario_dir(tmp_dir: Path, - arg_parser: ArgumentParser): - with given: - existing_file = tmp_dir / "scenario.py" - existing_file.touch() - - class CustomScenarioDir(CustomConfig): - default_scenarios_dir = existing_file - command = RunCommand(CustomScenarioDir, arg_parser) - - with when, raises(BaseException) as exc: - await command.run() - - with then: - assert exc.type is NotADirectoryError - assert "default_scenarios_dir" in str(exc.value) - assert "is not a directory" in str(exc.value) - - @pytest.mark.usefixtures(tmp_dir.__name__) -async def test_run_command_with_scenario_dir_outside_project_dir(arg_parser: ArgumentParser): - with given: - class CustomScenarioDir(CustomConfig): - default_scenarios_dir = "/tmp" - command = RunCommand(CustomScenarioDir, arg_parser) - - with when, raises(BaseException) as exc: - await command.run() - - with then: - assert exc.type is ValueError - assert "default_scenarios_dir" in str(exc.value) - assert "must be inside project directory" in str(exc.value) - - async def test_run_command_without_scenarios(arg_parser: ArgumentParser): with given: command = RunCommand(CustomConfig, arg_parser) @@ -125,46 +61,3 @@ async def test_run_command_with_scenarios(tmp_dir: Path, arg_parser: ArgumentPar with then: assert exc.type is SystemExit assert str(exc.value) == "0" - - -async def test_run_command_validate_plugin(tmp_dir: Path, arg_parser: ArgumentParser): - with given: - class ValidConfig(CustomConfig): - validate_plugins_configs = True - - class Plugins(Config.Plugins): - class Terminator(vedro.Config.Plugins.Terminator): - enabled = True - - command = RunCommand(ValidConfig, arg_parser) - create_scenario(tmp_dir, "scenario.py") - - with when, raises(BaseException) as exc: - await command.run() - - with then: - assert exc.type is SystemExit - assert str(exc.value) == "0" - - -@pytest.mark.usefixtures(tmp_dir.__name__) -async def test_run_command_validate_plugin_error(arg_parser: ArgumentParser): - with given: - class InvalidConfig(CustomConfig): - validate_plugins_configs = True - - class Plugins(Config.Plugins): - class Terminator(vedro.Config.Plugins.Terminator): - enabled = True - nonexisting = "nonexisting" - - command = RunCommand(InvalidConfig, arg_parser) - - with when, raises(BaseException) as exc: - await command.run() - - with then: - assert exc.type is AttributeError - assert str(exc.value) == ( - "Terminator configuration contains unknown attributes: nonexisting" - ) diff --git a/tests/events/test_events.py b/tests/events/test_events.py index 08281d14..d8c96f72 100644 --- a/tests/events/test_events.py +++ b/tests/events/test_events.py @@ -52,7 +52,7 @@ class RegisteredEvent(Event): # noqa: F811 pass with then: - assert exception.type is Exception + assert exception.type is RuntimeError assert str(exception.value) == f"Event {RegisteredEvent!r} already registered" diff --git a/vedro/__init__.py b/vedro/__init__.py index de639b3d..7714690b 100644 --- a/vedro/__init__.py +++ b/vedro/__init__.py @@ -2,7 +2,7 @@ from typing import Any from ._catched import catched -from ._config import Config +from ._config import Config, computed from ._context import context from ._interface import Interface from ._main import main @@ -25,7 +25,7 @@ __version__ = version __all__ = ("Scenario", "Interface", "run", "only", "skip", "skip_if", "params", "ensure", - "context", "defer", "defer_global", "Config", "catched", "create_tmp_dir", + "context", "defer", "defer_global", "Config", "computed", "catched", "create_tmp_dir", "create_tmp_file", "attach_artifact", "attach_scenario_artifact", "attach_step_artifact", "attach_global_artifact", "MemoryArtifact", "FileArtifact", "Artifact",) diff --git a/vedro/_config.py b/vedro/_config.py index 53f39cc7..5ed5a58b 100644 --- a/vedro/_config.py +++ b/vedro/_config.py @@ -1,4 +1,5 @@ from asyncio import CancelledError +from typing import Sequence, Type import vedro.core as core import vedro.plugins.artifacted as artifacted @@ -27,6 +28,7 @@ MonotonicScenarioRunner, MonotonicScenarioScheduler, MultiScenarioDiscoverer, + PluginConfig, ScenarioDiscoverer, ScenarioFileFinder, ScenarioFileLoader, @@ -37,6 +39,7 @@ ScenarioScheduler, Singleton, ) +from vedro.core.config_loader import computed from vedro.core.scenario_finder.scenario_file_finder import ( AnyFilter, DunderFilter, @@ -45,15 +48,35 @@ ) from vedro.core.scenario_orderer import StableScenarioOrderer -__all__ = ("Config",) +__all__ = ("Config", "computed",) class Config(core.Config): + """ + Defines the main configuration for the Vedro testing framework. + + This class contains settings for the framework's behavior, such as enabling + plugins, defining factories for core components, and specifying filters + for scenario discovery. + """ - # Validate each plugin's configuration, checking for unknown attributes to prevent errors validate_plugins_configs: bool = True + """ + Whether to validate plugin configurations. + + If set to `True`, the framework will validate plugin configurations to + ensure that no unknown attributes are defined, reducing the likelihood + of errors. + """ class Registry(core.Config.Registry): + """ + Defines factories and singleton instances for core components. + + The `Registry` class is responsible for configuring key components, + such as the scenario finder, loader, scheduler, and runner. + """ + Dispatcher = Singleton[Dispatcher](Dispatcher) ModuleLoader = Factory[ModuleLoader](ModuleFileLoader) @@ -86,18 +109,37 @@ class Registry(core.Config.Registry): )) class Plugins(core.Config.Plugins): + """ + Configuration for enabling and disabling plugins. + + This class contains nested classes for each plugin, where the `enabled` + attribute determines whether the plugin is active. + """ + class Director(director.Director): enabled = True class RichReporter(director.RichReporter): enabled = True + @computed + def depends_on(cls) -> Sequence[Type[PluginConfig]]: + return [Config.Plugins.Director] + class SilentReporter(director.SilentReporter): enabled = True + @computed + def depends_on(cls) -> Sequence[Type[PluginConfig]]: + return [Config.Plugins.Director] + class PyCharmReporter(director.PyCharmReporter): enabled = True + @computed + def depends_on(cls) -> Sequence[Type[PluginConfig]]: + return [Config.Plugins.Director] + class TempKeeper(temp_keeper.TempKeeper): enabled = True @@ -122,6 +164,10 @@ class Skipper(skipper.Skipper): class Slicer(slicer.Slicer): enabled = True + @computed + def depends_on(cls) -> Sequence[Type[PluginConfig]]: + return [Config.Plugins.Skipper] + class Tagger(tagger.Tagger): enabled = True diff --git a/vedro/_main.py b/vedro/_main.py index ea76803c..2bb57755 100644 --- a/vedro/_main.py +++ b/vedro/_main.py @@ -30,6 +30,7 @@ async def main() -> None: :raises FileNotFoundError: If the specified project directory does not exist. :raises NotADirectoryError: If the specified project directory path is not a directory. """ + # TODO: add argv parameter to main function in v2 to make it testable shadow_parser = ArgumentParser(add_help=False, allow_abbrev=False) shadow_parser.add_argument("--project-dir", type=Path, default=Path.cwd()) shadow_args, _ = shadow_parser.parse_known_args() diff --git a/vedro/commands/_cmd_arg_parser.py b/vedro/commands/_cmd_arg_parser.py index f32aacbe..297a96d2 100644 --- a/vedro/commands/_cmd_arg_parser.py +++ b/vedro/commands/_cmd_arg_parser.py @@ -7,10 +7,27 @@ class CommandArgumentParser(ArgumentParser): + """ + Extends the default `argparse.ArgumentParser` to support custom behavior for + parsing command-line arguments. + """ + def parse_known_args(self, args: Optional[Sequence[str]] = None, # type: ignore namespace: Optional[Namespace] = None) -> Tuple[Namespace, List[str]]: + """ + Parse the known arguments and return any unrecognized arguments. + + Overrides the default `parse_known_args` to adjust the default behavior. + If `args` is `None`, the method uses command-line arguments starting from the third + argument (e.g., after `$ prog command`). + + :param args: A sequence of arguments to parse. If `None`, defaults to `sys.argv[2:]`. + :param namespace: An optional `Namespace` object to populate with parsed arguments. + :return: A tuple containing the parsed arguments (`Namespace`) and a list of + unrecognized arguments. + """ if args is None: - # $ prog command <...> + # Use arguments starting after `$ prog command <...>` args = sys.argv[2:] else: args = list(args) @@ -18,10 +35,22 @@ def parse_known_args(self, args: Optional[Sequence[str]] = None, # type: ignore return super().parse_known_args(args, namespace) def add_help_action(self) -> None: + """ + Add the standard help action (`-h` or `--help`) to the parser. + + This action displays the help message and exits the program when invoked. + """ self.add_argument("-h", "--help", action="help", help="Show this help message and exit") # https://github.com/python/cpython/issues/95073 def remove_help_action(self) -> None: + """ + Remove the help action (`-h` or `--help`) from the parser. + + This method iterates through the parser's actions to identify the help action + and removes it. It also clears the corresponding option strings from the + parser's internal `_option_string_actions` dictionary. + """ for action in self._actions: if not isinstance(action, argparse._HelpAction): continue diff --git a/vedro/commands/_command.py b/vedro/commands/_command.py index 8697d117..3e9b252d 100644 --- a/vedro/commands/_command.py +++ b/vedro/commands/_command.py @@ -13,12 +13,9 @@ class Command(ABC): Serves as an abstract base class for defining commands. Commands are operations that can be executed with a specific configuration - and argument parser. Subclasses must implement the `run` method to define - the behavior of the command. - - :param config: The global configuration instance for the command. - :param arg_parser: The argument parser for parsing command-line options. - :param kwargs: Additional keyword arguments for customization. + and argument parser. Subclasses of `Command` must implement the `run` method + to define the behavior of the command. These commands are typically invoked + as part of a CLI or other runtime interface. """ def __init__(self, config: Type[Config], @@ -26,9 +23,9 @@ def __init__(self, config: Type[Config], """ Initialize the Command instance with a configuration and argument parser. - :param config: The global configuration instance. + :param config: The global configuration instance used by the command. :param arg_parser: The argument parser for parsing command-line options. - :param kwargs: Additional keyword arguments for customization. + :param kwargs: Additional keyword arguments for customization, if needed. """ self._config = config self._arg_parser = arg_parser @@ -38,7 +35,10 @@ async def run(self) -> None: """ Execute the command's logic. - Subclasses must implement this method to define the specific behavior - of the command when executed. + Subclasses must override this method to define the specific behavior + of the command. This method should contain the main execution flow + of the command. + + :raises NotImplementedError: If the subclass does not implement this method. """ pass diff --git a/vedro/commands/run_command/_config_validator.py b/vedro/commands/run_command/_config_validator.py new file mode 100644 index 00000000..aff82d8e --- /dev/null +++ b/vedro/commands/run_command/_config_validator.py @@ -0,0 +1,65 @@ +from pathlib import Path + +from vedro.core import Config, ConfigType + +__all__ = ("ConfigValidator",) + + +class ConfigValidator: + """ + Validates the configuration provided to the Vedro framework. + + This class ensures that the configuration values, particularly the default + scenarios directory, adhere to the expected constraints and types. + """ + + def __init__(self, config: ConfigType) -> None: + """ + Initialize the ConfigValidator. + + :param config: The configuration object to validate. + """ + self._config = config + + def validate(self) -> None: + """ + Perform validation on the configuration object. + + Validates the `default_scenarios_dir` attribute, ensuring it is of the correct type, + exists, is a directory, and is within the project directory. + + :raises TypeError: If `default_scenarios_dir` is not a `Path` or `str`. + :raises FileNotFoundError: If `default_scenarios_dir` does not exist. + :raises NotADirectoryError: If `default_scenarios_dir` is not a directory. + :raises ValueError: If `default_scenarios_dir` is outside the project directory. + """ + default_scenarios_dir = self._config.default_scenarios_dir + if default_scenarios_dir == Config.default_scenarios_dir: + # Default value is valid, no further checks needed. + return + + if not isinstance(default_scenarios_dir, (Path, str)): + raise TypeError( + "Expected `default_scenarios_dir` to be a Path or str, " + f"got {type(default_scenarios_dir)} ({default_scenarios_dir!r})" + ) + + scenarios_dir = Path(default_scenarios_dir).resolve() + if not scenarios_dir.exists(): + raise FileNotFoundError( + f"`default_scenarios_dir` ('{scenarios_dir}') does not exist" + ) + + if not scenarios_dir.is_dir(): + raise NotADirectoryError( + f"`default_scenarios_dir` ('{scenarios_dir}') is not a directory" + ) + + try: + # Ensure the scenarios directory is inside the project directory. + scenarios_dir.relative_to(self._config.project_dir) + except ValueError: + raise ValueError( + f"`default_scenarios_dir` ('{scenarios_dir}') must be inside the project directory" + f" ('{self._config.project_dir}')" + ) diff --git a/vedro/commands/run_command/_plugin_config_validator.py b/vedro/commands/run_command/_plugin_config_validator.py new file mode 100644 index 00000000..3c5e21d2 --- /dev/null +++ b/vedro/commands/run_command/_plugin_config_validator.py @@ -0,0 +1,124 @@ +from collections.abc import Sequence +from inspect import isclass +from os import linesep +from typing import Any, Set, Type + +from vedro.core import Plugin, PluginConfig + +__all__ = ("PluginConfigValidator",) + + +class PluginConfigValidator: + """ + Validates plugin configuration classes in Vedro. + + This class ensures that plugin configurations and their dependencies meet the + expected criteria, including type checks, dependency checks, and attribute validation. + """ + + def __init__(self, *, validate_plugins_attrs: bool = True) -> None: + """ + Initialize the PluginConfigValidator. + + :param validate_plugins_attrs: Whether to validate unknown attributes in the plugin + configuration class. Default is True. + """ + self._validate_plugins_attrs = validate_plugins_attrs + + def validate(self, plugin_config: Type[PluginConfig]) -> None: + """ + Validate a plugin configuration class and its dependencies. + + This method checks the following: + - The plugin configuration class is a subclass of `PluginConfig`. + - The `plugin` attribute in the configuration is a subclass of `Plugin`. + - The `depends_on` attribute is a sequence of valid plugin configuration classes. + - Enabled dependencies are also enabled when the current plugin is enabled. + - Optionally, unknown attributes in the plugin configuration class. + + :param plugin_config: The plugin configuration class to validate. + :raises TypeError: If the plugin configuration or its attributes are invalid. + :raises ValueError: If dependencies are not enabled when the current plugin is enabled. + :raises AttributeError: If unknown attributes are found in the plugin configuration. + """ + if not self._is_subclass(plugin_config, PluginConfig): + raise TypeError( + f"PluginConfig '{plugin_config}' must be a subclass of 'vedro.core.PluginConfig'" + ) + + if not self._is_subclass(plugin_config.plugin, Plugin) or (plugin_config.plugin is Plugin): + raise TypeError( + f"Attribute 'plugin' in '{plugin_config.__name__}' must be a subclass of " + "'vedro.core.Plugin'" + ) + + if not isinstance(plugin_config.depends_on, Sequence): + raise TypeError( + f"Attribute 'depends_on' in '{plugin_config.__name__}' plugin must be a list or " + f"another sequence type ({type(plugin_config.depends_on)} provided). " + + linesep.join([ + "Example:", + " @computed", + " def depends_on(cls):", + " return [Config.Plugins.Tagger]" + ]) + ) + + for dep in plugin_config.depends_on: + if not self._is_subclass(dep, PluginConfig): + raise TypeError( + f"Dependency '{dep}' in 'depends_on' of '{plugin_config.__name__}' " + "must be a subclass of 'vedro.core.PluginConfig'" + ) + + if self._validate_plugins_attrs: + self._validate_plugin_config_attrs(plugin_config) + + def _validate_plugin_config_attrs(self, plugin_config: Type[PluginConfig]) -> None: + """ + Validate the attributes of a plugin configuration class. + + Ensures that no unknown attributes are defined in the plugin configuration. + + :param plugin_config: The plugin configuration class to validate. + :raises AttributeError: If unknown attributes are found in the plugin configuration. + """ + unknown_attrs = self._get_attrs(plugin_config) - self._get_parent_attrs(plugin_config) + if unknown_attrs: + attrs = ", ".join(unknown_attrs) + raise AttributeError( + f"{plugin_config.__name__} configuration contains unknown attributes: {attrs}" + ) + + def _is_subclass(self, cls: Any, parent: Any) -> bool: + """ + Check if the given class is a subclass of a specified parent class. + + :param cls: The class to check. + :param parent: The parent class to check against. + :return: True if `cls` is a subclass of `parent`, False otherwise. + """ + return isclass(cls) and issubclass(cls, parent) + + def _get_attrs(self, cls: type) -> Set[str]: + """ + Retrieve the set of attributes for a class. + + :param cls: The class to retrieve attributes for. + :return: A set of attribute names for the class. + """ + return set(vars(cls)) + + def _get_parent_attrs(self, cls: type) -> Set[str]: + """ + Recursively retrieve attributes from parent classes. + + :param cls: The class to retrieve parent attributes for. + :return: A set of attribute names for the parent classes. + """ + attrs = set() + # `object` (the base for all classes) has no __bases__ + for base in cls.__bases__: + attrs |= self._get_attrs(base) + attrs |= self._get_parent_attrs(base) + return attrs diff --git a/vedro/commands/run_command/_plugin_registrar.py b/vedro/commands/run_command/_plugin_registrar.py new file mode 100644 index 00000000..055c0ea5 --- /dev/null +++ b/vedro/commands/run_command/_plugin_registrar.py @@ -0,0 +1,187 @@ +from collections import defaultdict, deque +from os import linesep +from typing import Callable, Dict, Iterable, List, Type, Union + +from vedro.core import Dispatcher, PluginConfig + +from ._plugin_config_validator import PluginConfigValidator + +__all__ = ("PluginRegistrar",) + +PluginConfigValidatorFactory = Union[ + Type[PluginConfigValidator], + Callable[[], PluginConfigValidator] +] + +ResolvedDeps = Dict[Type[PluginConfig], Type[PluginConfig]] + + +class PluginRegistrar: + """ + Manages the registration of plugins and ensures their dependencies are satisfied. + + This class validates plugins, resolves their dependencies, orders them + topologically, and registers them with the provided dispatcher. + """ + + def __init__(self, *, + plugin_config_validator_factory: PluginConfigValidatorFactory = + PluginConfigValidator) -> None: + """ + Initialize the PluginRegistrar. + + :param plugin_config_validator_factory: Factory for creating a `PluginConfigValidator` + instance, used to validate plugin configurations. + """ + self._plugin_config_validator = plugin_config_validator_factory() + + def register(self, plugins: Iterable[Type[PluginConfig]], dispatcher: Dispatcher) -> None: + """ + Register plugins with the dispatcher. + + This method validates, orders, and registers enabled plugins with the dispatcher. + + :param plugins: An iterable of plugin configuration classes. + :param dispatcher: The dispatcher to which the plugins will be registered. + """ + for plugin_config in self._get_ordered_plugins(plugins): + plugin = plugin_config.plugin(config=plugin_config) + dispatcher.register(plugin) + + def _get_ordered_plugins(self, + plugins: Iterable[Type[PluginConfig]]) -> List[Type[PluginConfig]]: + """ + Get a topologically ordered list of enabled plugins. + + This method validates each plugin, filters out disabled plugins, resolves their + dependencies, and returns them in a dependency-respecting order. + + :param plugins: An iterable of plugin configuration classes. + :return: A list of enabled plugin configuration classes in topological order. + """ + enabled_plugins = [] + for plugin_config in plugins: + self._plugin_config_validator.validate(plugin_config) + if plugin_config.enabled: + enabled_plugins.append(plugin_config) + + return self._order_plugins(enabled_plugins, self._resolve_dependencies(plugins)) + + def _resolve_dependencies(self, plugins: Iterable[Type[PluginConfig]]) -> ResolvedDeps: + """ + Resolve dependencies between plugins. + + This method maps each plugin to its dependencies, ensuring that they are satisfied + and enabled. + + :param plugins: An iterable of plugin configuration classes. + :return: A dictionary mapping plugin configuration classes to their resolved dependencies. + :raises ValueError: If a plugin depends on an unknown or disabled plugin. + """ + resolved_deps = {plugin: plugin for plugin in plugins} + + for plugin in plugins: + for dep in plugin.depends_on: + resolved = self._resolve_dependency(dep, resolved_deps) + + if resolved is None: + raise ValueError( + f"Plugin '{plugin.__name__}' depends on unknown plugin '{dep.__name__}'" + ) + + if not resolved.enabled: + raise ValueError( + f"Plugin '{plugin.__name__}' depends on disabled plugin '{dep.__name__}'" + ) + + resolved_deps[dep] = resolved + + return resolved_deps + + def _resolve_dependency(self, dep: Type[PluginConfig], + resolved_deps: ResolvedDeps) -> Union[Type[PluginConfig], None]: + """ + Resolve a single dependency. + + This method attempts to find a match for the dependency in the resolved dependencies. + + :param dep: The plugin configuration class to resolve. + :param resolved_deps: A dictionary of already resolved dependencies. + :return: The resolved plugin configuration class if found, or `None` if not found. + """ + if dep in resolved_deps: + return dep + + for candidate in resolved_deps: + if (candidate.__name__ == dep.__name__) and issubclass(candidate.plugin, dep.plugin): + return candidate + + return None + + def _order_plugins(self, plugins: List[Type[PluginConfig]], + resolved_deps: ResolvedDeps) -> List[Type[PluginConfig]]: + """ + Order the given plugins based on their dependencies using a topological sort. + + This method ensures that plugins are loaded in an order that respects their + dependencies, raising an error if a cyclic dependency is detected. + + :param plugins: A list of enabled plugin configuration classes. + :param resolved_deps: A dictionary mapping plugins to their resolved dependencies. + :return: A list of plugin configuration classes in topological order. + :raises RuntimeError: If a cyclic dependency is detected between plugins. + """ + # adjacency will map each plugin to the list of plugins that depend on it + adjacency: Dict[Type[PluginConfig], List[Type[PluginConfig]]] = defaultdict(list) + # in_degree keeps track of how many direct dependencies each plugin has + in_degree: Dict[Type[PluginConfig], int] = defaultdict(int) + + # First, build the adjacency list and in_degree map. + # For each plugin: + # - Ensure it's represented in in_degree (defaulting to 0). + # - For each dependency, add this plugin to the dependency's adjacency list, + # and increment this plugin's in_degree. + for plugin in plugins: + in_degree[plugin] = in_degree.get(plugin, 0) + for dep in plugin.depends_on: + resolved = resolved_deps[dep] + adjacency[resolved].append(plugin) + in_degree[plugin] += 1 + + # Initialize a queue with all plugins that have an in_degree of 0, + # meaning they have no dependencies or all of their dependencies + # are not in the enabled plugin list. + queue = deque([p for p in plugins if in_degree[p] == 0]) + + # We'll store our ordered result in 'ordered', + # and track the count of visited plugins in 'visited_count' + ordered = [] + visited_count = 0 + + # Standard topological sort process: + # Remove a plugin from the queue, add it to the result list, + # then decrement the in_degree of all its "neighbors" (plugins that depend on it). + # If a neighbor's in_degree drops to 0, add it to the queue. + while queue: + plugin = queue.popleft() + ordered.append(plugin) + visited_count += 1 + + for neighbor in adjacency[plugin]: + in_degree[neighbor] -= 1 + if in_degree[neighbor] == 0: + queue.append(neighbor) + + # If the number of visited plugins doesn't match + # the total number of plugins, we have a cycle. + if visited_count != len(plugins): + problematic_plugins = [p.__name__ for p, deg in in_degree.items() if deg > 0] + bullet_prefix = f"{linesep} - " + raise RuntimeError( + "A cyclic dependency between plugins has been detected. " + "Any of the following plugins could be referencing each other in a cycle:" + f"{bullet_prefix + bullet_prefix.join(problematic_plugins)}{linesep}" + "Please review their 'depends_on' references to resolve this issue." + ) + + return ordered diff --git a/vedro/commands/run_command/_run_command.py b/vedro/commands/run_command/_run_command.py index b3e9b862..55e779b3 100644 --- a/vedro/commands/run_command/_run_command.py +++ b/vedro/commands/run_command/_run_command.py @@ -3,10 +3,11 @@ import warnings from argparse import Namespace from pathlib import Path -from typing import Set, Type +from typing import Callable, Type, Union from vedro import Config -from vedro.core import Dispatcher, MonotonicScenarioRunner, Plugin, PluginConfig +from vedro.core import Config as BaseConfig +from vedro.core import Dispatcher, MonotonicScenarioRunner from vedro.events import ( ArgParsedEvent, ArgParseEvent, @@ -18,137 +19,94 @@ from .._cmd_arg_parser import CommandArgumentParser from .._command import Command +from ._config_validator import ConfigValidator +from ._plugin_config_validator import PluginConfigValidator +from ._plugin_registrar import PluginRegistrar __all__ = ("RunCommand",) +ConfigValidatorFactory = Union[ + Type[ConfigValidator], + Callable[[Type[BaseConfig]], ConfigValidator] +] + +PluginRegistrarFactory = Union[ + Type[PluginRegistrar], + Callable[[], PluginRegistrar] +] + class RunCommand(Command): """ - Represents the "run" command for executing scenarios. - - This command handles the entire lifecycle of scenario execution, including: - - Validating configuration parameters. - - Registering plugins with the dispatcher. - - Parsing command-line arguments. - - Discovering scenarios. - - Scheduling and executing scenarios. - - Dispatching events before and after scenario execution. - - :param config: The global configuration instance. - :param arg_parser: The argument parser for parsing command-line options. + Implements the 'run' command for Vedro. + + This command handles the lifecycle of running scenarios, including configuration + validation, plugin registration, scenario discovery, execution, and reporting. """ - def __init__(self, config: Type[Config], arg_parser: CommandArgumentParser) -> None: + def __init__(self, config: Type[Config], arg_parser: CommandArgumentParser, *, + config_validator_factory: ConfigValidatorFactory = ConfigValidator, + plugin_registrar_factory: PluginRegistrarFactory = PluginRegistrar) -> None: """ - Initialize the RunCommand instance. + Initialize the RunCommand. - :param config: The global configuration instance. - :param arg_parser: The argument parser for parsing command-line options. + :param config: The configuration class for Vedro. + :param arg_parser: The argument parser for parsing command-line arguments. + :param config_validator_factory: Factory for creating a `ConfigValidator` instance. + :param plugin_registrar_factory: Factory for creating a `PluginRegistrar` instance. """ super().__init__(config, arg_parser) - - def _validate_config(self) -> None: - """ - Validate the configuration parameters. - - Ensures that the `default_scenarios_dir` is a valid directory within the - project directory. Raises appropriate exceptions if validation fails. - - :raises TypeError: If `default_scenarios_dir` is not a `Path` or `str`. - :raises FileNotFoundError: If `default_scenarios_dir` does not exist. - :raises NotADirectoryError: If `default_scenarios_dir` is not a directory. - :raises ValueError: If `default_scenarios_dir` is not inside the project directory. - """ - default_scenarios_dir = self._config.default_scenarios_dir - if default_scenarios_dir == Config.default_scenarios_dir: - return - - if not isinstance(default_scenarios_dir, (Path, str)): - raise TypeError( - "Expected `default_scenarios_dir` to be a Path, " - f"got {type(default_scenarios_dir)} ({default_scenarios_dir!r})" - ) - - scenarios_dir = Path(default_scenarios_dir).resolve() - if not scenarios_dir.exists(): - raise FileNotFoundError( - f"`default_scenarios_dir` ('{scenarios_dir}') does not exist" - ) - - if not scenarios_dir.is_dir(): - raise NotADirectoryError( - f"`default_scenarios_dir` ('{scenarios_dir}') is not a directory" + self._config_validator = config_validator_factory(config) + self._plugin_registrar = plugin_registrar_factory( + plugin_config_validator_factory=lambda: PluginConfigValidator( + validate_plugins_attrs=config.validate_plugins_configs # type: ignore ) + ) - try: - scenarios_dir.relative_to(self._config.project_dir) - except ValueError: - raise ValueError( - f"`default_scenarios_dir` ('{scenarios_dir}') must be inside project directory " - f"('{self._config.project_dir}')" - ) - - async def _register_plugins(self, dispatcher: Dispatcher) -> None: + async def run(self) -> None: """ - Register plugins with the dispatcher. + Execute the 'run' command. - Iterates through the configuration's `Plugins` section, validates plugin configurations, - and registers enabled plugins with the dispatcher. + This method validates the configuration, registers plugins, discovers scenarios, + executes scenarios, and generates the report. It also fires various events during + the lifecycle. - :param dispatcher: The dispatcher to register plugins with. - :raises TypeError: If a plugin is not a subclass of `vedro.core.Plugin`. + :raises Exception: If a `SystemExit` exception is encountered during discovery. """ - for _, section in self._config.Plugins.items(): - if not issubclass(section.plugin, Plugin) or (section.plugin is Plugin): - raise TypeError( - f"Plugin {section.plugin} should be subclass of vedro.core.Plugin" - ) + # TODO: move config validation to somewhere else in v2 + # (e.g. to the ConfigLoader) + self._config_validator.validate() # Must be before ConfigLoadedEvent - if self._config.validate_plugins_configs: - self._validate_plugin_config(section) + dispatcher = self._config.Registry.Dispatcher() + self._plugin_registrar.register(self._config.Plugins.values(), dispatcher) - if section.enabled: - plugin = section.plugin(config=section) - dispatcher.register(plugin) + await dispatcher.fire(ConfigLoadedEvent(self._config.path, self._config)) - def _validate_plugin_config(self, plugin_config: Type[PluginConfig]) -> None: - """ - Validate the configuration of a plugin. + args = await self._parse_args(dispatcher) + start_dir = self._get_start_dir(args) - Ensures that the plugin's configuration does not contain unknown attributes. + discoverer = self._config.Registry.ScenarioDiscoverer() - :param plugin_config: The configuration of the plugin. - :raises AttributeError: If the plugin configuration contains unknown attributes. - """ - unknown_attrs = self._get_attrs(plugin_config) - self._get_parent_attrs(plugin_config) - if unknown_attrs: - attrs = ", ".join(unknown_attrs) - raise AttributeError( - f"{plugin_config.__name__} configuration contains unknown attributes: {attrs}" - ) + kwargs = {} + # Backward compatibility (to be removed in v2): + signature = inspect.signature(discoverer.discover) + if "project_dir" in signature.parameters: + kwargs["project_dir"] = self._config.project_dir - def _get_attrs(self, cls: type) -> Set[str]: - """ - Retrieve the set of attributes for a class. + try: + scenarios = await discoverer.discover(start_dir, **kwargs) + except SystemExit as e: + raise Exception(f"SystemExit({e.code}) ⬆") - :param cls: The class to retrieve attributes for. - :return: A set of attribute names for the class. - """ - return set(vars(cls)) + scheduler = self._config.Registry.ScenarioScheduler(scenarios) + await dispatcher.fire(StartupEvent(scheduler)) - def _get_parent_attrs(self, cls: type) -> Set[str]: - """ - Recursively retrieve attributes from parent classes. + runner = self._config.Registry.ScenarioRunner() + if not isinstance(runner, (MonotonicScenarioRunner, DryRunner)): + warnings.warn("Deprecated: custom runners will be removed in v2.0", DeprecationWarning) + report = await runner.run(scheduler) - :param cls: The class to retrieve parent attributes for. - :return: A set of attribute names for the parent classes. - """ - attrs = set() - # `object` (the base for all classes) has no __bases__ - for base in cls.__bases__: - attrs |= self._get_attrs(base) - attrs |= self._get_parent_attrs(base) - return attrs + await dispatcher.fire(CleanupEvent(report)) async def _parse_args(self, dispatcher: Dispatcher) -> Namespace: """ @@ -179,49 +137,6 @@ async def _parse_args(self, dispatcher: Dispatcher) -> Namespace: return args - async def run(self) -> None: - """ - Execute the command lifecycle. - - This method validates the configuration, registers plugins, parses arguments, - discovers scenarios, schedules them, and executes them. - - :raises Exception: If scenario discovery raises a `SystemExit`. - """ - # TODO: move config validation to somewhere else in v2 - self._validate_config() # Must be before ConfigLoadedEvent - - dispatcher = self._config.Registry.Dispatcher() - await self._register_plugins(dispatcher) - - await dispatcher.fire(ConfigLoadedEvent(self._config.path, self._config)) - - args = await self._parse_args(dispatcher) - start_dir = self._get_start_dir(args) - - discoverer = self._config.Registry.ScenarioDiscoverer() - - kwargs = {} - # Backward compatibility (to be removed in v2): - signature = inspect.signature(discoverer.discover) - if "project_dir" in signature.parameters: - kwargs["project_dir"] = self._config.project_dir - - try: - scenarios = await discoverer.discover(start_dir, **kwargs) - except SystemExit as e: - raise Exception(f"SystemExit({e.code}) ⬆") - - scheduler = self._config.Registry.ScenarioScheduler(scenarios) - await dispatcher.fire(StartupEvent(scheduler)) - - runner = self._config.Registry.ScenarioRunner() - if not isinstance(runner, (MonotonicScenarioRunner, DryRunner)): - warnings.warn("Deprecated: custom runners will be removed in v2.0", DeprecationWarning) - report = await runner.run(scheduler) - - await dispatcher.fire(CleanupEvent(report)) - def _get_start_dir(self, args: Namespace) -> Path: """ Determine the starting directory for discovering scenarios. diff --git a/vedro/commands/version_command/_version_command.py b/vedro/commands/version_command/_version_command.py index caac8aca..db1891a2 100644 --- a/vedro/commands/version_command/_version_command.py +++ b/vedro/commands/version_command/_version_command.py @@ -13,15 +13,41 @@ def make_console() -> Console: + """ + Create and configure a Rich Console instance. + + The console is used for outputting text in the terminal. + + :return: A `Console` instance with specific configurations. + """ return Console(highlight=False, force_terminal=True, markup=False, soft_wrap=True) class VersionCommand(Command): + """ + Implements the 'version' command for Vedro. + + This command outputs the current version of Vedro to the console. + """ + def __init__(self, config: Type[Config], arg_parser: CommandArgumentParser, *, console_factory: Callable[[], Console] = make_console) -> None: + """ + Initialize the VersionCommand. + + :param config: The configuration class for Vedro. + :param arg_parser: The argument parser for parsing command-line arguments. + :param console_factory: A callable that returns a `Console` instance for output. + """ super().__init__(config, arg_parser) self._console = console_factory() async def run(self) -> None: + """ + Execute the 'version' command. + + This method parses the command-line arguments and outputs the current version + of Vedro to the console. + """ self._arg_parser.parse_args() self._console.print(f"Vedro {vedro.__version__}", style=Style(color="blue")) diff --git a/vedro/core/_artifacts.py b/vedro/core/_artifacts.py index 443a8b8e..593458cd 100644 --- a/vedro/core/_artifacts.py +++ b/vedro/core/_artifacts.py @@ -9,10 +9,9 @@ class Artifact(ABC): """ The base class for representing artifacts in a system. - An artifact in this context is a piece of data generated during the execution - of a scenario or a step. It can be anything from log files, screenshots, to data dumps. - This class serves as an abstract base class for different types of artifacts, - such as MemoryArtifact and FileArtifact. + An artifact is a piece of data generated during the execution of a scenario or a step. + It can be anything such as log files, screenshots, or data dumps. This class serves as + an abstract base class for other artifact types, such as `MemoryArtifact` and`FileArtifact`. """ pass @@ -20,15 +19,19 @@ class Artifact(ABC): class MemoryArtifact(Artifact): """ Represents an artifact that is stored in memory. + + This class is used for artifacts whose data resides entirely in memory, such as + temporary data buffers, small data blobs, or in-memory-generated files. """ - def __init__(self, name: str, mime_type: str, data: bytes) -> None: + def __init__(self, name: str, mime_type: str, data: bytes) -> None: """ Initialize a MemoryArtifact with a name, MIME type, and data. :param name: The name of the artifact. - :param mime_type: The MIME type of the data. - :param data: The actual data in bytes. + :param mime_type: The MIME type of the data (e.g., "text/plain", "application/json"). + :param data: The actual data of the artifact as a byte sequence. + :raises TypeError: If `data` is not of type `bytes`. """ if not isinstance(data, bytes): raise TypeError("'data' must be of type bytes") @@ -41,7 +44,7 @@ def name(self) -> str: """ Get the name of the artifact. - :return: The name of the artifact. + :return: The name of the artifact as a string. """ return self._name @@ -59,7 +62,7 @@ def data(self) -> bytes: """ Get the data stored in the artifact. - :return: The data as bytes. + :return: The data as a byte sequence. """ return self._data @@ -67,7 +70,8 @@ def __repr__(self) -> str: """ Represent the MemoryArtifact as a string. - :return: A string representation of the MemoryArtifact. + :return: A string representation of the MemoryArtifact, including its name, + MIME type, and size of the data. """ size = len(self._data) return f"{self.__class__.__name__}<{self._name!r}, {self._mime_type!r}, size={size}>" @@ -85,6 +89,9 @@ def __eq__(self, other: Any) -> bool: class FileArtifact(Artifact): """ Represents an artifact that is stored as a file on the filesystem. + + This class is used for artifacts whose data is written to or read from disk, + such as large logs, reports, or exported data files. """ def __init__(self, name: str, mime_type: str, path: Path) -> None: @@ -92,8 +99,9 @@ def __init__(self, name: str, mime_type: str, path: Path) -> None: Initialize a FileArtifact with a name, MIME type, and file path. :param name: The name of the artifact. - :param mime_type: The MIME type of the file. - :param path: The path to the file. + :param mime_type: The MIME type of the file (e.g., "image/png", "text/plain"). + :param path: The path to the file containing the artifact data. + :raises TypeError: If `path` is not an instance of `pathlib.Path`. """ if not isinstance(path, Path): raise TypeError("'path' must be an instance of pathlib.Path") @@ -106,7 +114,7 @@ def name(self) -> str: """ Get the name of the artifact. - :return: The name of the artifact. + :return: The name of the artifact as a string. """ return self._name @@ -124,7 +132,7 @@ def path(self) -> Path: """ Get the file path of the artifact. - :return: The path of the file as a Path object. + :return: The path of the file as a `Path` object. """ return self._path @@ -132,7 +140,8 @@ def __repr__(self) -> str: """ Represent the FileArtifact as a string. - :return: A string representation of the FileArtifact. + :return: A string representation of the FileArtifact, including its name, + MIME type, and file path. """ return f"{self.__class__.__name__}<{self._name!r}, {self._mime_type!r}, {self._path!r}>" diff --git a/vedro/core/_container.py b/vedro/core/_container.py index f5663ad4..7a3318b3 100644 --- a/vedro/core/_container.py +++ b/vedro/core/_container.py @@ -12,37 +12,108 @@ class ConflictError(Exception): + """ + Raised when there is a conflict during the registration of a resolver. + + This exception is raised if a new resolver is attempted to be registered + when one is already registered by another plugin. + """ pass class Container(Generic[T], ABC): + """ + Base class for dependency injection containers. + + A container is responsible for managing the creation and resolution of + objects of a specific type. Subclasses define specific behaviors for + managing resolvers and handling object creation (e.g., singleton or factory). + + :param resolver: The initial resolver function or type for creating objects. + """ + def __init__(self, resolver: FactoryType[T]) -> None: + """ + Initialize the container with the given resolver. + + :param resolver: A callable or type used to create objects of type `T`. + """ self._resolver = resolver self._initial = resolver self._registrant: Union[Plugin, None] = None def _make_conflict_error(self, registrant: Plugin) -> ConflictError: + """ + Create a conflict error indicating a registration conflict. + + :param registrant: The plugin attempting to register the resolver. + :return: A `ConflictError` with details about the conflicting registration. + """ type_ = self.__orig_class__.__args__[0] # type: ignore return ConflictError(f"{registrant} is trying to register {type_.__name__}, " f"but it is already registered by {self._registrant!r}") @abstractmethod def register(self, resolver: FactoryType[T], registrant: Plugin) -> None: + """ + Register a new resolver with the container. + + :param resolver: A callable or type used to create objects of type `T`. + :param registrant: The plugin attempting to register the resolver. + :raises ConflictError: If another resolver is already registered. + """ pass @abstractmethod def resolve(self, *args: Any, **kwargs: Any) -> T: + """ + Resolve and return an object of type `T`. + + Subclasses should define how the object is created or retrieved. + + :param args: Positional arguments to pass to the resolver. + :param kwargs: Keyword arguments to pass to the resolver. + :return: An instance of type `T`. + """ pass def __call__(self, *args: Any, **kwargs: Any) -> T: + """ + Call the container to resolve an object. + + This is a shorthand for calling the `resolve` method. + + :param args: Positional arguments to pass to the resolver. + :param kwargs: Keyword arguments to pass to the resolver. + :return: An instance of type `T`. + """ return self.resolve(*args, **kwargs) def __repr__(self) -> str: + """ + Return a string representation of the container. + + :return: A string describing the container and its resolver. + """ return f"{self.__class__.__name__}({self._resolver!r})" class Factory(Container[T]): + """ + A container that creates a new instance of an object each time it is resolved. + + The `Factory` class manages resolvers that are responsible for creating new + objects of type `T` upon each resolution. + """ + def register(self, resolver: FactoryType[T], registrant: Plugin) -> None: + """ + Register a new resolver with the factory. + + :param resolver: A callable or type used to create objects of type `T`. + :param registrant: The plugin attempting to register the resolver. + :raises ConflictError: If another resolver is already registered. + """ assert isinstance(registrant, Plugin) if self._registrant is not None: @@ -52,15 +123,41 @@ def register(self, resolver: FactoryType[T], registrant: Plugin) -> None: self._registrant = registrant def resolve(self, *args: Any, **kwargs: Any) -> T: + """ + Create and return a new instance of type `T`. + + :param args: Positional arguments to pass to the resolver. + :param kwargs: Keyword arguments to pass to the resolver. + :return: A new instance of type `T`. + """ return self._resolver(*args, **kwargs) class Singleton(Container[T]): + """ + A container that ensures a single instance of an object is created and reused. + + The `Singleton` class manages resolvers that are responsible for creating + objects of type `T`, ensuring that the same instance is returned on each resolution. + """ + def __init__(self, resolver: FactoryType[T]) -> None: + """ + Initialize the singleton container with the given resolver. + + :param resolver: A callable or type used to create objects of type `T`. + """ super().__init__(resolver) self._singleton: Union[None, T] = None def register(self, resolver: FactoryType[T], registrant: Plugin) -> None: + """ + Register a new resolver with the singleton container. + + :param resolver: A callable or type used to create objects of type `T`. + :param registrant: The plugin attempting to register the resolver. + :raises ConflictError: If another resolver is already registered. + """ assert isinstance(registrant, Plugin) if self._registrant is not None: @@ -70,6 +167,16 @@ def register(self, resolver: FactoryType[T], registrant: Plugin) -> None: self._registrant = registrant def resolve(self, *args: Any, **kwargs: Any) -> T: + """ + Resolve and return the singleton instance of type `T`. + + If the singleton instance has not been created yet, it will be created + using the resolver. Subsequent calls will return the same instance. + + :param args: Positional arguments to pass to the resolver. + :param kwargs: Keyword arguments to pass to the resolver. + :return: The singleton instance of type `T`. + """ if self._singleton is None: self._singleton = self._resolver(*args, **kwargs) return self._singleton diff --git a/vedro/core/_event.py b/vedro/core/_event.py index 0f5b4797..f06c3e6a 100644 --- a/vedro/core/_event.py +++ b/vedro/core/_event.py @@ -4,22 +4,61 @@ class EventRegistry: + """ + Maintains a registry of all event classes. + + This class provides methods to register new event classes and check if an + event class is already registered. It ensures that event names are unique + within the system. + """ + events: Set[str] = set() @classmethod def register(cls, event: Type["Event"]) -> None: + """ + Register a new event class in the registry. + + :param event: The event class to register. + """ cls.events.add(event.__name__) @classmethod def is_registered(cls, event: Type["Event"]) -> bool: + """ + Check if an event class is already registered. + + :param event: The event class to check. + :return: True if the event is registered, otherwise False. + """ return event.__name__ in cls.events class Event: + """ + Serves as a base class for all events. + + This class ensures that every event subclass is uniquely registered + in the `EventRegistry`. It also provides equality checks based on + the attributes of the event. + """ + def __init_subclass__(cls, **kwargs: Any) -> None: + """ + Perform subclass registration and ensure unique event names. + + :raises RuntimeError: If the event class is already registered. + """ if EventRegistry.is_registered(cls): - raise Exception(f"Event {cls} already registered") + raise RuntimeError(f"Event {cls} already registered") EventRegistry.register(cls) def __eq__(self, other: Any) -> bool: + """ + Check for equality between two event instances. + + :param other: The object to compare with. + :return: True if both objects are of the same class and have + the same attributes, otherwise False. + """ return isinstance(other, self.__class__) and (self.__dict__ == other.__dict__) diff --git a/vedro/core/_exc_info.py b/vedro/core/_exc_info.py index 2e114a5c..8b0bd7ae 100644 --- a/vedro/core/_exc_info.py +++ b/vedro/core/_exc_info.py @@ -6,11 +6,11 @@ class ExcInfo: """ - Represent exception information. + Represents exception information. This class encapsulates the details of an exception, including its type, the exception instance itself, and the traceback associated with the exception. It provides a structured - way to store and access exception information. + way to store and access exception details. """ def __init__(self, @@ -20,9 +20,10 @@ def __init__(self, """ Initialize an instance of ExcInfo with exception details. - :param type_: The type of the exception. - :param value: The exception instance. - :param traceback: The traceback object associated with the exception. + :param type_: The type of the exception (e.g., `ValueError`, `TypeError`). + :param value: The exception instance (i.e., the exception object raised). + :param traceback: The traceback object associated with the exception, representing + the call stack at the point where the exception occurred. """ self.type = type_ self.value = value @@ -32,6 +33,6 @@ def __repr__(self) -> str: """ Return a string representation of the ExcInfo instance. - :return: A string representation of the ExcInfo instance. + :return: A string containing the exception type, value, and traceback. """ return f"{self.__class__.__name__}({self.type!r}, {self.value!r}, {self.traceback!r})" diff --git a/vedro/core/_plugin.py b/vedro/core/_plugin.py index 1468c2b3..7bc34cef 100644 --- a/vedro/core/_plugin.py +++ b/vedro/core/_plugin.py @@ -1,29 +1,92 @@ -from typing import Type +from typing import Sequence, Type from ._dispatcher import Dispatcher, Subscriber from .config_loader import Section as ConfigSection +from .config_loader import computed __all__ = ("Plugin", "PluginConfig",) class Plugin(Subscriber): + """ + Represents a base class for all plugins in Vedro. + + Plugins define custom behavior by subscribing to events using the `subscribe` method. + Each plugin is associated with a configuration class (`PluginConfig`) that specifies + its settings and dependencies. + """ + def __init__(self, config: Type["PluginConfig"]) -> None: + """ + Initialize the Plugin instance with the given configuration class. + + :param config: The configuration class for the plugin, which must be a subclass + of `PluginConfig`. + :raises TypeError: If the provided `config` is not a subclass of `PluginConfig`. + """ if not issubclass(config, PluginConfig): raise TypeError(f"PluginConfig {config} should be subclass of vedro.core.PluginConfig") self._config = config @property def config(self) -> Type["PluginConfig"]: + """ + Retrieve the configuration class associated with the plugin. + + :return: The configuration class for the plugin. + """ return self._config def subscribe(self, dispatcher: Dispatcher) -> None: + """ + Subscribe the plugin to events using the given dispatcher. + + This method is intended to be overridden by subclasses to define event listeners. + + :param dispatcher: The event dispatcher used to register event listeners. + """ pass def __repr__(self) -> str: + """ + Return a string representation of the Plugin instance. + + :return: A string describing the plugin and its configuration class. + """ return f"{self.__class__.__name__}({self._config.__name__})" class PluginConfig(ConfigSection): + """ + Represents the configuration for a plugin. + + This class defines the settings and dependencies for a plugin, including the plugin class, + a description, whether the plugin is enabled, and any dependencies on other plugins. + """ + plugin: Type[Plugin] = Plugin + """ + The plugin class associated with this configuration. Defaults to the base `Plugin` class. + """ + description: str = "" + """ + A brief description of the plugin. + """ + enabled: bool = True + """ + Specifies whether the plugin is enabled. Defaults to True. + """ + + @computed + def depends_on(cls) -> Sequence[Type["PluginConfig"]]: + """ + Define the dependencies of this plugin configuration. + + This method can be overridden in subclasses to specify other plugin configurations + that this plugin depends on. + + :return: A sequence of plugin configuration classes that this plugin depends on. + """ + return () diff --git a/vedro/core/_step_result.py b/vedro/core/_step_result.py index 51eda0ff..bd6af324 100644 --- a/vedro/core/_step_result.py +++ b/vedro/core/_step_result.py @@ -10,20 +10,25 @@ class StepStatus(Enum): """ - Enumeration of possible states for `StepResult` to indicate the current status - of a test step. + Enumeration of possible states for a `StepResult`. - For more information, refer to https://vedro.io/docs/core/step-status. + Used to indicate the current status of a test step during execution. """ - # Indicates the step is awaiting execution. PENDING = "PENDING" + """ + Indicates the step is awaiting execution. + """ - # Signifies the step has completed successfully. PASSED = "PASSED" + """ + Signifies the step has completed successfully. + """ - # Marks the step as unsuccessful due to an assertion failure or an unexpected error. FAILED = "FAILED" + """ + Marks the step as unsuccessful due to an assertion failure or an unexpected error. + """ class StepResult: diff --git a/vedro/core/config_loader/__init__.py b/vedro/core/config_loader/__init__.py index 66e92369..1adc2b37 100644 --- a/vedro/core/config_loader/__init__.py +++ b/vedro/core/config_loader/__init__.py @@ -1,5 +1,6 @@ from ._config_file_loader import ConfigFileLoader from ._config_loader import ConfigLoader -from ._config_type import Config, ConfigType, Section +from ._config_type import Config, ConfigType, Section, computed -__all__ = ("Config", "Section", "ConfigType", "ConfigLoader", "ConfigFileLoader",) +__all__ = ("Config", "Section", "ConfigType", "computed", + "ConfigLoader", "ConfigFileLoader",) diff --git a/vedro/core/config_loader/_config_type.py b/vedro/core/config_loader/_config_type.py index 8908272d..23ccfdae 100644 --- a/vedro/core/config_loader/_config_type.py +++ b/vedro/core/config_loader/_config_type.py @@ -3,34 +3,89 @@ import cabina -__all__ = ("Config", "Section", "ConfigType",) +__all__ = ("Config", "Section", "ConfigType", "computed",) class Section(cabina.Section): + """ + Represents a base configuration section. + + This class extends `cabina.Section` and is used as a foundation for grouping + related configuration options. Framework-specific configurations can subclass + `Section` to define custom sections. + """ pass class Config(cabina.Config, Section): + """ + Base configuration class for applications. + + The `Config` class provides the foundation for managing configuration options, + such as paths, directories, and custom settings. It is designed to be extended by other + configurations for more specialized use cases. + + This class is built on top of `cabina.Config` and `Section`, inheriting + their capabilities while offering core options such as the project root, + configuration file paths, and default directories. + """ - # Path to the configuration file. By default, it points to the file - # where this class is defined (__file__). At runtime, it is typically - # set to 'vedro.cfg.py' in the project's root directory. path: Path = Path(__file__) + """ + Path to the configuration file. + + By default, this points to the file where the `Config` class is defined + (`__file__`). At runtime, this is typically set to `'vedro.cfg.py'` in + the project's root directory. + """ - # Root directory of the project, used as a base for resolving relative paths - # and locating project files. Defaults to the current working directory but - # can be overridden using the `--project-dir` command-line argument. project_dir: Path = Path.cwd() + """ + Root directory of the project. + + This is used as a base for resolving relative paths and locating project + files. By default, it is set to the current working directory (`cwd`), but + it can be overridden using the `--project-dir` command-line argument. + """ - # Default location for storing scenario files, located in the "scenarios/" - # subdirectory of the project's root. default_scenarios_dir: Union[Path, str] = "scenarios/" + """ + Default location for storing scenario files. + + This defaults to the `"scenarios/"` subdirectory of the project's root directory. + It can be set as a `Path` or a `str` and is used for discovering scenario files. + """ class Registry(Section): + """ + Base section for registry-related configuration. + + Subclasses of `Config` can extend this section to include runtime + registry options, such as factories or dependency injection configurations. + """ pass class Plugins(Section): + """ + Base section for plugins-related configuration. + + Framework-specific configurations can extend this section to manage + plugin settings (e.g., enabling/disabling plugins, setting plugin options). + """ pass ConfigType = Type[Config] +""" +Alias for the type of the `Config` class. + +This can be used for type annotations or runtime type checking. +""" + +computed = cabina.computed +""" +Decorator for computed configuration properties. + +This is a re-export of `cabina.computed`, allowing you to define dynamic +configuration options whose values are computed based on other properties. +""" diff --git a/vedro/core/scenario_discoverer/_multi_scenario_discoverer.py b/vedro/core/scenario_discoverer/_multi_scenario_discoverer.py index f31de87f..a8c7ce8a 100644 --- a/vedro/core/scenario_discoverer/_multi_scenario_discoverer.py +++ b/vedro/core/scenario_discoverer/_multi_scenario_discoverer.py @@ -58,4 +58,12 @@ async def discover(self, root: Path, *, loaded = await self._loader.load(path) for scn in loaded: scenarios.append(create_vscenario(scn, project_dir=project_dir)) - return await self._orderer.sort(scenarios) + + ordered = await self._orderer.sort(scenarios) + if len(scenarios) != len(ordered): + raise ValueError( + f"The scenario orderer returned {len(ordered)} scenarios, " + f"but {len(scenarios)} scenarios were discovered. " + "Please ensure the orderer only reorders scenarios without adding or removing any" + ) + return ordered diff --git a/vedro/core/scenario_finder/_scenario_finder.py b/vedro/core/scenario_finder/_scenario_finder.py index 837cf2d7..d1b01261 100644 --- a/vedro/core/scenario_finder/_scenario_finder.py +++ b/vedro/core/scenario_finder/_scenario_finder.py @@ -17,14 +17,14 @@ class ScenarioFinder(ABC): @abstractmethod async def find(self, root: Path) -> AsyncGenerator[Path, None]: """ - Finds and yields paths to scenario files starting from the given root directory. + Find and yield paths to scenario files starting from the given root directory. This is an abstract method that must be implemented by subclasses. It should define how scenario files are located and yielded from the specified root directory. :param root: The root directory path to start the search for scenario files. :yield: Paths to scenario files found under the root directory. - :raises NotImplementedError: This method should be overridden in subclasses. + :raises NotImplementedError: If the method is not implemented in a subclass. """ - # Next line is just a placeholder due to https://github.com/python/mypy/issues/5070 + # Placeholder due to https://github.com/python/mypy/issues/5070 yield Path() diff --git a/vedro/core/scenario_finder/scenario_file_finder/_any_filter.py b/vedro/core/scenario_finder/scenario_file_finder/_any_filter.py index f5c95c4b..095471c1 100644 --- a/vedro/core/scenario_finder/scenario_file_finder/_any_filter.py +++ b/vedro/core/scenario_finder/scenario_file_finder/_any_filter.py @@ -7,10 +7,27 @@ class AnyFilter(FileFilter): + """ + Combines multiple filters using a logical OR operation. + + This filter passes a path if at least one of the provided filters passes the path. + """ + def __init__(self, filters: List[FileFilter]) -> None: + """ + Initialize the AnyFilter with a list of filters. + + :param filters: A list of FileFilter objects to combine. + """ self._filters = filters def filter(self, path: Path) -> bool: + """ + Check if the given path passes any of the filters. + + :param path: The file or directory path to evaluate. + :return: True if at least one filter passes the path, otherwise False. + """ for filter_ in self._filters: if filter_.filter(path): return True diff --git a/vedro/core/scenario_finder/scenario_file_finder/_dunder_filter.py b/vedro/core/scenario_finder/scenario_file_finder/_dunder_filter.py index 402b72c3..008c09ba 100644 --- a/vedro/core/scenario_finder/scenario_file_finder/_dunder_filter.py +++ b/vedro/core/scenario_finder/scenario_file_finder/_dunder_filter.py @@ -6,7 +6,19 @@ class DunderFilter(FileFilter): + """ + Filters files or directories with names that start and end with double underscores. + + This filter matches paths like "__init__.py" or "__cache__". + """ + def filter(self, path: Path) -> bool: + """ + Check if the given path matches the dunder naming convention. + + :param path: The file or directory path to evaluate. + :return: True if the name starts and ends with double underscores, otherwise False. + """ p = path while p.suffix != "": p = p.with_suffix("") diff --git a/vedro/core/scenario_finder/scenario_file_finder/_ext_filter.py b/vedro/core/scenario_finder/scenario_file_finder/_ext_filter.py index 480df8d4..f64fa4c6 100644 --- a/vedro/core/scenario_finder/scenario_file_finder/_ext_filter.py +++ b/vedro/core/scenario_finder/scenario_file_finder/_ext_filter.py @@ -7,15 +7,36 @@ class ExtFilter(FileFilter): + """ + Filters files based on their extensions. + + This filter allows inclusion or exclusion of files based on their extensions. + """ + def __init__(self, *, only: Optional[List[str]] = None, ignore: Optional[List[str]] = None) -> None: + """ + Initialize the ExtFilter with inclusion or exclusion criteria. + + :param only: A list of extensions to include (e.g., ['.py', '.txt']). + If provided, only files with these extensions will pass. + :param ignore: A list of extensions to exclude (e.g., ['.log', '.tmp']). + If provided, files with these extensions will not pass. + :raises ValueError: If both 'only' and 'ignore' are specified simultaneously. + """ self._only = only or None self._ignore = ignore or None if (self._only is not None) and (self._ignore is not None): raise ValueError("Use 'only' or 'ignore' (not both)") def filter(self, path: Path) -> bool: + """ + Check if the given path passes the extension filter. + + :param path: The file path to evaluate. + :return: False if the file passes the filter criteria, otherwise True. + """ if self._only: for suffix in self._only: if path.name.endswith(suffix): diff --git a/vedro/core/scenario_finder/scenario_file_finder/_file_filter.py b/vedro/core/scenario_finder/scenario_file_finder/_file_filter.py index 0c157b27..b27b4540 100644 --- a/vedro/core/scenario_finder/scenario_file_finder/_file_filter.py +++ b/vedro/core/scenario_finder/scenario_file_finder/_file_filter.py @@ -5,6 +5,19 @@ class FileFilter(ABC): + """ + Abstract base class for file filters. + + This class provides a common interface for filters that determine whether a file + or directory should be included or excluded based on custom logic. + """ + @abstractmethod def filter(self, path: Path) -> bool: + """ + Determine whether the given path passes the filter. + + :param path: The file or directory path to evaluate. + :return: True if the path should be excluded, otherwise False. + """ pass diff --git a/vedro/core/scenario_finder/scenario_file_finder/_hidden_filter.py b/vedro/core/scenario_finder/scenario_file_finder/_hidden_filter.py index 515f7e71..fc703db5 100644 --- a/vedro/core/scenario_finder/scenario_file_finder/_hidden_filter.py +++ b/vedro/core/scenario_finder/scenario_file_finder/_hidden_filter.py @@ -6,5 +6,17 @@ class HiddenFilter(FileFilter): + """ + Filters hidden files and directories. + + This filter matches paths whose names start with a dot (e.g., ".hidden"). + """ + def filter(self, path: Path) -> bool: + """ + Check if the given path is hidden. + + :param path: The file or directory path to evaluate. + :return: True if the name starts with a dot, otherwise False. + """ return path.name.startswith(".") diff --git a/vedro/core/scenario_finder/scenario_file_finder/_scenario_file_finder.py b/vedro/core/scenario_finder/scenario_file_finder/_scenario_file_finder.py index c973c0cf..e7eaf590 100644 --- a/vedro/core/scenario_finder/scenario_file_finder/_scenario_file_finder.py +++ b/vedro/core/scenario_finder/scenario_file_finder/_scenario_file_finder.py @@ -19,7 +19,7 @@ class ScenarioFileFinder(ScenarioFinder): def __init__(self, file_filter: Optional[FileFilter] = None, dir_filter: Optional[FileFilter] = None) -> None: """ - Initializes the ScenarioFileFinder with optional file and directory filters. + Initialize the ScenarioFileFinder with optional file and directory filters. :param file_filter: An optional FileFilter to apply to each file found. If provided, only files that pass the filter will be included. @@ -32,7 +32,7 @@ def __init__(self, file_filter: Optional[FileFilter] = None, async def find(self, root: Path) -> AsyncGenerator[Path, None]: """ - Finds and yields scenario file paths starting from the given root directory. + Find and yield scenario file paths starting from the given root directory. This method traverses the directory tree starting from the specified root. It applies file and directory filters (if provided) to identify relevant scenario files. diff --git a/vedro/core/scenario_orderer/_plain_scenario_orderer.py b/vedro/core/scenario_orderer/_plain_scenario_orderer.py index 8684f10c..519fde81 100644 --- a/vedro/core/scenario_orderer/_plain_scenario_orderer.py +++ b/vedro/core/scenario_orderer/_plain_scenario_orderer.py @@ -8,5 +8,18 @@ # deprecated class PlainScenarioOrderer(ScenarioOrderer): + """ + A simple scenario orderer that preserves the original order of scenarios. + + This orderer does not alter the order of the provided scenarios. It is marked + as deprecated and may be removed in future versions. + """ + async def sort(self, scenarios: List[VirtualScenario]) -> List[VirtualScenario]: + """ + Return the scenarios in their original order. + + :param scenarios: The list of `VirtualScenario` instances to be sorted. + :return: The same list of scenarios without any modifications. + """ return scenarios diff --git a/vedro/core/scenario_orderer/_scenario_orderer.py b/vedro/core/scenario_orderer/_scenario_orderer.py index 1fde929b..fccbe6e8 100644 --- a/vedro/core/scenario_orderer/_scenario_orderer.py +++ b/vedro/core/scenario_orderer/_scenario_orderer.py @@ -7,6 +7,24 @@ class ScenarioOrderer(ABC): + """ + Abstract base class for scenario orderers. + + This class defines the interface for sorting a list of scenarios. Concrete + implementations of this class should define specific ordering logic. + """ + @abstractmethod async def sort(self, scenarios: List[VirtualScenario]) -> List[VirtualScenario]: + """ + Sort the given list of scenarios. + + This is an abstract method that must be implemented by subclasses. It should + return a sorted list of `VirtualScenario` objects based on a specific ordering + logic. + + :param scenarios: A list of `VirtualScenario` instances to be sorted. + :return: A new list of `VirtualScenario` instances in the desired order. + :raises NotImplementedError: If the method is not implemented in a subclass. + """ pass diff --git a/vedro/core/scenario_orderer/_stable_scenario_orderer.py b/vedro/core/scenario_orderer/_stable_scenario_orderer.py index ec799b47..5f7b8629 100644 --- a/vedro/core/scenario_orderer/_stable_scenario_orderer.py +++ b/vedro/core/scenario_orderer/_stable_scenario_orderer.py @@ -32,6 +32,9 @@ async def sort(self, scenarios: List[VirtualScenario]) -> List[VirtualScenario]: """ Sort the scenarios in a stable order based on their file paths. + This method ensures a consistent order of scenarios by using a comparison + key derived from the structure and content of their file paths. + :param scenarios: The list of `VirtualScenario` instances to be sorted. :return: A new list of scenarios sorted in a stable order. """ diff --git a/vedro/core/scenario_result/_aggregated_result.py b/vedro/core/scenario_result/_aggregated_result.py index 26bd950a..a82325da 100644 --- a/vedro/core/scenario_result/_aggregated_result.py +++ b/vedro/core/scenario_result/_aggregated_result.py @@ -8,20 +8,58 @@ class AggregatedResult(ScenarioResult): + """ + Represents an aggregated result of a main scenario and its related scenarios. + + This class extends `ScenarioResult` to include a collection of additional + scenario results. It is typically used to group the results of a main scenario + along with its sub-scenarios or related scenarios. + """ + def __init__(self, scenario: VirtualScenario) -> None: + """ + Initialize the AggregatedResult with a main scenario. + + :param scenario: The main virtual scenario associated with this aggregated result. + """ super().__init__(scenario) self._scenario_results: List[ScenarioResult] = [] @property def scenario_results(self) -> List[ScenarioResult]: + """ + Get the list of aggregated scenario results. + + :return: A list containing all scenario results aggregated in this result. + """ return self._scenario_results[:] def add_scenario_result(self, scenario_result: ScenarioResult) -> None: + """ + Add a scenario result to the aggregation. + + :param scenario_result: The scenario result to add to the aggregation. + """ self._scenario_results.append(scenario_result) @staticmethod def from_existing(main_scenario_result: ScenarioResult, scenario_results: List[ScenarioResult]) -> "AggregatedResult": + """ + Create an `AggregatedResult` instance from an existing main scenario result + and a list of additional scenario results. + + This method copies all properties (e.g., steps, artifacts, extra details, scope) + from the main scenario result and aggregates the additional scenario results. + It also adjusts the `started_at` and `ended_at` timestamps based on the provided + scenario results. + + :param main_scenario_result: The main scenario result to base the aggregation on. + :param scenario_results: A list of additional scenario results to aggregate. + + :return: A new `AggregatedResult` instance. + :raises AssertionError: If the list of `scenario_results` is empty. + """ result = AggregatedResult(main_scenario_result.scenario) if main_scenario_result.is_passed(): @@ -42,7 +80,9 @@ def from_existing(main_scenario_result: ScenarioResult, for extra in main_scenario_result.extra_details: result.add_extra_details(extra) - assert len(scenario_results) > 0 + if len(scenario_results) == 0: + raise ValueError("No scenario results provided for aggregation") + for scenario_result in scenario_results: if scenario_result.started_at is not None: if (result.started_at is None) or (scenario_result.started_at < result.started_at): diff --git a/vedro/core/scenario_result/_scenario_status.py b/vedro/core/scenario_result/_scenario_status.py index e1076121..d3dc0673 100644 --- a/vedro/core/scenario_result/_scenario_status.py +++ b/vedro/core/scenario_result/_scenario_status.py @@ -5,20 +5,28 @@ class ScenarioStatus(Enum): """ - Enumeration of possible states for `ScenarioResult` to indicate the current status - of a test scenario. + Defines the possible states for a `ScenarioResult`. - For more information, refer to https://vedro.io/docs/core/scenario-status. + This enumeration indicates the current status of a test scenario during or after + execution. It provides four distinct states to represent the lifecycle of a scenario. """ - # Indicates the scenario is queued and waiting for execution. PENDING = "PENDING" + """ + Indicates the scenario is queued and waiting for execution. + """ - # Signifies the scenario has completed successfully with all assertions validated. PASSED = "PASSED" + """ + Signifies the scenario has completed successfully with all assertions validated. + """ - # Marks the scenario as unsuccessful due to failed assertions or unexpected errors. FAILED = "FAILED" + """ + Marks the scenario as unsuccessful due to failed assertions or unexpected errors. + """ - # Represents scenarios that are deliberately not executed. SKIPPED = "SKIPPED" + """ + Represents scenarios that are deliberately not executed. + """ diff --git a/vedro/core/scenario_runner/_interrupted.py b/vedro/core/scenario_runner/_interrupted.py index e84abda6..6b1e4add 100644 --- a/vedro/core/scenario_runner/_interrupted.py +++ b/vedro/core/scenario_runner/_interrupted.py @@ -6,50 +6,133 @@ class Interrupted(BaseException): + """ + Base class for interruptions during the execution process. + + This exception is raised to signal that an execution flow has been interrupted. + Subclasses provide specific details for steps, scenarios, or the entire run. + """ pass class StepInterrupted(Interrupted): + """ + Represents an interruption that occurs during the execution of a step. + + This exception provides information about the exception that caused the interruption + and the result of the step execution up to the point of interruption. + """ + def __init__(self, exc_info: ExcInfo, step_result: StepResult) -> None: + """ + Initialize the StepInterrupted exception. + + :param exc_info: The exception information that caused the interruption. + :param step_result: The step result at the time of the interruption. + """ self._exc_info = exc_info self._step_result = step_result @property def exc_info(self) -> ExcInfo: + """ + Get the exception information that caused the interruption. + + :return: The exception information object. + """ return self._exc_info @property def step_result(self) -> StepResult: + """ + Get the step result at the time of the interruption. + + :return: The step result instance. + """ return self._step_result def __repr__(self) -> str: + """ + Return a string representation of the exception. + + :return: A string describing the exception. + """ return f"{self.__class__.__name__}({self._exc_info!r}, {self._step_result!r})" class ScenarioInterrupted(Interrupted): + """ + Represents an interruption that occurs during the execution of a scenario. + + This exception provides information about the exception that caused the interruption + and the result of the scenario execution up to the point of interruption. + """ + def __init__(self, exc_info: ExcInfo, scenario_result: ScenarioResult) -> None: + """ + Initialize the ScenarioInterrupted exception. + + :param exc_info: The exception information that caused the interruption. + :param scenario_result: The scenario result at the time of the interruption. + """ self._exc_info = exc_info self._scenario_result = scenario_result @property def exc_info(self) -> ExcInfo: + """ + Get the exception information that caused the interruption. + + :return: The exception information object. + """ return self._exc_info @property def scenario_result(self) -> ScenarioResult: + """ + Get the scenario result at the time of the interruption. + + :return: The scenario result instance. + """ return self._scenario_result def __repr__(self) -> str: + """ + Return a string representation of the exception. + + :return: A string describing the exception. + """ return f"{self.__class__.__name__}({self._exc_info!r}, {self._scenario_result!r})" class RunInterrupted(Interrupted): + """ + Represents an interruption that occurs during the execution of the entire run. + + This exception provides information about the exception that caused the interruption. + """ + def __init__(self, exc_info: ExcInfo) -> None: + """ + Initialize the RunInterrupted exception. + + :param exc_info: The exception information that caused the interruption. + """ self._exc_info = exc_info @property def exc_info(self) -> ExcInfo: + """ + Get the exception information that caused the interruption. + + :return: The exception information object. + """ return self._exc_info def __repr__(self) -> str: + """ + Return a string representation of the exception. + + :return: A string describing the exception. + """ return f"{self.__class__.__name__}({self._exc_info!r})" diff --git a/vedro/core/scenario_runner/_monotonic_scenario_runner.py b/vedro/core/scenario_runner/_monotonic_scenario_runner.py index fa0d40c5..d7428733 100644 --- a/vedro/core/scenario_runner/_monotonic_scenario_runner.py +++ b/vedro/core/scenario_runner/_monotonic_scenario_runner.py @@ -30,20 +30,51 @@ class MonotonicScenarioRunner(ScenarioRunner): + """ + Represents a scenario runner that executes scenarios in a monotonic order. + + This runner is responsible for managing the execution of scenarios and their steps, + firing events during the process, and handling interruptions. + """ + def __init__(self, dispatcher: Dispatcher, *, interrupt_exceptions: Tuple[Type[BaseException], ...] = ()) -> None: + """ + Initialize the MonotonicScenarioRunner. + + :param dispatcher: The dispatcher used to fire events during the execution process. + :param interrupt_exceptions: A tuple of exceptions that should interrupt the execution. + """ self._dispatcher = dispatcher assert isinstance(interrupt_exceptions, tuple) self._interrupt_exceptions = interrupt_exceptions + (Interrupted,) def _is_interruption(self, exc_info: ExcInfo, exceptions: Tuple[Type[BaseException], ...]) -> bool: + """ + Check if the given exception matches one of the interruption exceptions. + + :param exc_info: The exception information. + :param exceptions: A tuple of exception types to check against. + :return: True if the exception matches one of the interruption exceptions, False otherwise. + """ for exception in exceptions: if isinstance(exc_info.value, exception): return True return False async def run_step(self, step: VirtualStep, ref: Scenario) -> StepResult: + """ + Execute a single step of a scenario. + + This method handles both synchronous and asynchronous steps, fires events + during execution, and manages exceptions raised during the step. + + :param step: The virtual step to be executed. + :param ref: The reference to the scenario instance. + :return: The result of the step execution. + :raises StepInterrupted: If the step execution is interrupted. + """ step_result = StepResult(step) await self._dispatcher.fire(StepRunEvent(step_result)) @@ -71,6 +102,16 @@ async def run_step(self, step: VirtualStep, ref: Scenario) -> StepResult: return step_result async def run_scenario(self, scenario: VirtualScenario) -> ScenarioResult: + """ + Execute a single scenario and its associated steps. + + This method fires events during execution, handles skipped scenarios, + and manages interruptions or failures within the scenario. + + :param scenario: The virtual scenario to be executed. + :return: The result of the scenario execution. + :raises ScenarioInterrupted: If the scenario execution is interrupted. + """ scenario_result = ScenarioResult(scenario) if scenario.is_skipped(): @@ -78,7 +119,7 @@ async def run_scenario(self, scenario: VirtualScenario) -> ScenarioResult: await self._dispatcher.fire(ScenarioSkippedEvent(scenario_result)) return scenario_result - os.chdir(scenario._project_dir) # TODO: do not use private attribute + os.chdir(scenario._project_dir) # TODO: Avoid using private attributes directly await self._dispatcher.fire(ScenarioRunEvent(scenario_result)) scenario_result.set_started_at(time()) @@ -113,11 +154,30 @@ async def run_scenario(self, scenario: VirtualScenario) -> ScenarioResult: async def _report_scenario_results(self, scenario_results: List[ScenarioResult], report: Report, scheduler: ScenarioScheduler) -> None: + """ + Report the results of a scenario's executions. + + This method aggregates the results of the scenario's executions and adds + them to the report. It also fires a `ScenarioReportedEvent`. + + :param scenario_results: A list of scenario results to report. + :param report: The report object to which results are added. + :param scheduler: The scheduler used to aggregate the scenario results. + """ aggregated_result = scheduler.aggregate_results(scenario_results) report.add_result(aggregated_result) await self._dispatcher.fire(ScenarioReportedEvent(aggregated_result)) async def _run_scenarios(self, scheduler: ScenarioScheduler, report: Report) -> None: + """ + Execute all scenarios provided by the scheduler. + + This method manages scenario execution, aggregates results, and handles interruptions. + + :param scheduler: The scheduler providing scenarios to execute. + :param report: The report object to which results are added. + :raises RunInterrupted: If the execution is interrupted by an exception. + """ scenario_results: List[ScenarioResult] = [] async for scenario in scheduler: @@ -144,6 +204,15 @@ async def _run_scenarios(self, scheduler: ScenarioScheduler, report: Report) -> await self._report_scenario_results(scenario_results, report, scheduler) async def run(self, scheduler: ScenarioScheduler) -> Report: + """ + Execute all scenarios and return the final report. + + This method manages the execution of all scenarios provided by the scheduler + and handles any interruptions during the process. + + :param scheduler: The scheduler providing scenarios to execute. + :return: The final report containing all results and any interruption information. + """ report = Report() try: await self._run_scenarios(scheduler, report) diff --git a/vedro/core/scenario_runner/_scenario_runner.py b/vedro/core/scenario_runner/_scenario_runner.py index 400ca0ec..93a10bd0 100644 --- a/vedro/core/scenario_runner/_scenario_runner.py +++ b/vedro/core/scenario_runner/_scenario_runner.py @@ -7,6 +7,23 @@ class ScenarioRunner(ABC): + """ + Defines an abstract base class for a scenario runner. + + A scenario runner is responsible for executing scenarios provided by a scheduler + and producing a report of the results. + """ + @abstractmethod async def run(self, scheduler: ScenarioScheduler) -> Report: + """ + Execute scenarios provided by the scheduler and return a report. + + Subclasses must implement this method to define the execution logic, + including handling scenario execution, managing interruptions, and + aggregating results into the final report. + + :param scheduler: The scheduler providing scenarios to execute. + :return: A report containing the results of all executed scenarios. + """ pass diff --git a/vedro/core/scenario_scheduler/_monotonic_scenario_scheduler.py b/vedro/core/scenario_scheduler/_monotonic_scenario_scheduler.py index 4a6dc130..7afcb2a4 100644 --- a/vedro/core/scenario_scheduler/_monotonic_scenario_scheduler.py +++ b/vedro/core/scenario_scheduler/_monotonic_scenario_scheduler.py @@ -9,27 +9,70 @@ class MonotonicScenarioScheduler(ScenarioScheduler): + """ + Implements a monotonic scenario scheduler. + + This scheduler ensures that scenarios are executed in a monotonic order, based on + their discovery order, and supports scheduling scenarios for repeated execution. + """ + def __init__(self, scenarios: List[VirtualScenario]) -> None: + """ + Initialize the MonotonicScenarioScheduler with the provided scenarios. + + :param scenarios: A list of virtual scenarios to be managed by the scheduler. + """ super().__init__(scenarios) self._scheduled = OrderedDict((k, (v, 0)) for k, v in reversed(self._discovered.items())) self._queue: OrderedDict[str, Tuple[VirtualScenario, int]] = OrderedDict() @property def scheduled(self) -> Iterator[VirtualScenario]: + """ + Get an iterator over the scheduled scenarios. + + Each scenario is yielded once for each time it has been scheduled, plus its + initial occurrence. + + :return: An iterator over the scheduled scenarios. + """ for scenario_id in reversed(self._scheduled): scenario, repeats = self._scheduled[scenario_id] for _ in range(repeats + 1): yield scenario def ignore(self, scenario: VirtualScenario) -> None: + """ + Remove a scenario from the scheduler. + + This method removes the scenario from both the scheduled list and the execution queue. + + :param scenario: The virtual scenario to be ignored. + """ self._scheduled.pop(scenario.unique_id, None) self._queue.pop(scenario.unique_id, None) def __aiter__(self) -> "ScenarioScheduler": + """ + Prepare the scheduler for asynchronous iteration. + + This method resets the queue to match the current scheduled scenarios. + + :return: The scheduler instance itself. + """ self._queue = self._scheduled.copy() return super().__aiter__() async def __anext__(self) -> VirtualScenario: + """ + Retrieve the next scenario to be executed in the queue. + + If the scenario has been scheduled multiple times, it decrements the repeat count + and re-adds the scenario to the queue. + + :return: The next virtual scenario to be executed. + :raises StopAsyncIteration: If no more scenarios are available in the queue. + """ while len(self._queue) > 0: _, (scenario, repeats) = self._queue.popitem() if repeats > 0: @@ -38,6 +81,14 @@ async def __anext__(self) -> VirtualScenario: raise StopAsyncIteration() def schedule(self, scenario: VirtualScenario) -> None: + """ + Schedule a scenario for repeated execution. + + This method increments the repeat count of the scenario in both the scheduled list + and the queue, or adds it as a new scenario if it is not already scheduled. + + :param scenario: The virtual scenario to be scheduled. + """ if scenario.unique_id in self._scheduled: scn, repeats = self._scheduled[scenario.unique_id] scheduled = (scn, repeats + 1) @@ -53,6 +104,17 @@ def schedule(self, scenario: VirtualScenario) -> None: self._queue[scenario.unique_id] = queued def aggregate_results(self, scenario_results: List[ScenarioResult]) -> AggregatedResult: + """ + Aggregate the results of a scenario's executions. + + This method creates an aggregated result for the scenario. If any of the executions failed, + the aggregated result will be marked as failed. Otherwise, the result of the first + execution is used as the base. + + :param scenario_results: A list of scenario results to be aggregated. + :return: An aggregated result representing the combined outcome of the executions. + :raises AssertionError: If the list of scenario results is empty. + """ assert len(scenario_results) > 0 result = scenario_results[0] diff --git a/vedro/core/scenario_scheduler/_scenario_scheduler.py b/vedro/core/scenario_scheduler/_scenario_scheduler.py index 4e85095a..17073eca 100644 --- a/vedro/core/scenario_scheduler/_scenario_scheduler.py +++ b/vedro/core/scenario_scheduler/_scenario_scheduler.py @@ -9,37 +9,108 @@ class ScenarioScheduler(ABC): + """ + Defines an abstract base class for a scenario scheduler. + + A scenario scheduler is responsible for managing the execution of scenarios, + including scheduling, ignoring, and aggregating results. + """ + def __init__(self, scenarios: List[VirtualScenario]) -> None: + """ + Initialize the ScenarioScheduler with a list of discovered scenarios. + + :param scenarios: A list of virtual scenarios to be managed by the scheduler. + """ self._discovered = OrderedDict((scn.unique_id, scn) for scn in scenarios) @property def discovered(self) -> Iterator[VirtualScenario]: + """ + Get an iterator over the discovered scenarios. + + Discovered scenarios are those initially provided to the scheduler. + + :return: An iterator over the discovered virtual scenarios. + """ for scenario_id in self._discovered: yield self._discovered[scenario_id] @property @abstractmethod def scheduled(self) -> Iterator[VirtualScenario]: + """ + Get an iterator over the scheduled scenarios. + + Scheduled scenarios are those that are currently queued for execution. + This property must be implemented by subclasses. + + :return: An iterator over the scheduled virtual scenarios. + """ pass @abstractmethod def schedule(self, scenario: VirtualScenario) -> None: + """ + Schedule a scenario for execution. + + This method must be implemented by subclasses to handle the addition + of a scenario to the scheduler's execution queue. + + :param scenario: The virtual scenario to be scheduled. + """ pass @abstractmethod def ignore(self, scenario: VirtualScenario) -> None: + """ + Remove a scenario from the scheduler. + + This method must be implemented by subclasses to handle the removal + of a scenario from both the discovered and scheduled lists. + + :param scenario: The virtual scenario to be ignored. + """ pass @abstractmethod def aggregate_results(self, scenario_results: List[ScenarioResult]) -> AggregatedResult: + """ + Aggregate the results of a scenario's executions. + + This method must be implemented by subclasses to create an aggregated + result representing the combined outcomes of a scenario's executions. + + :param scenario_results: A list of scenario results to be aggregated. + :return: An aggregated result representing the combined outcome of the executions. + """ pass def __aiter__(self) -> "ScenarioScheduler": + """ + Prepare the scheduler for asynchronous iteration. + + :return: The scheduler instance itself. + """ return self @abstractmethod async def __anext__(self) -> VirtualScenario: + """ + Retrieve the next scenario to be executed asynchronously. + + This method must be implemented by subclasses to provide support + for asynchronous iteration over scheduled scenarios. + + :return: The next virtual scenario to be executed. + :raises StopAsyncIteration: If no more scenarios are available for execution. + """ pass def __repr__(self) -> str: + """ + Return a string representation of the scheduler. + + :return: A string describing the scheduler instance. + """ return f"<{self.__class__.__name__}>" diff --git a/vedro/events/__init__.py b/vedro/events/__init__.py index b8a23a0c..b4bb6607 100644 --- a/vedro/events/__init__.py +++ b/vedro/events/__init__.py @@ -14,153 +14,376 @@ class ConfigLoadedEvent(Event): + """ + Represents the event triggered when the configuration is loaded. + + This event provides access to the configuration file path and the loaded configuration object. + """ + def __init__(self, path: Path, config: ConfigType) -> None: + """ + Initialize the ConfigLoadedEvent. + + :param path: The path to the configuration file. + :param config: The loaded configuration object. + """ self._path = path self._config = config @property def path(self) -> Path: + """ + Get the path to the configuration file. + + :return: The configuration file path. + """ return self._path @property def config(self) -> ConfigType: + """ + Get the loaded configuration object. + + :return: The configuration object. + """ return self._config def __repr__(self) -> str: + """ + Return a string representation of the event. + + :return: A string describing the event. + """ return f"{self.__class__.__name__}({self._path!r}, )" class ArgParseEvent(Event): + """ + Represents the event triggered when command-line arguments are being parsed. + + This event allows plugins to modify the argument parser. + """ + def __init__(self, arg_parser: ArgumentParser) -> None: + """ + Initialize the ArgParseEvent. + + :param arg_parser: The argument parser used for command-line argument parsing. + """ self._arg_parser = arg_parser @property def arg_parser(self) -> ArgumentParser: + """ + Get the argument parser. + + :return: The argument parser instance. + """ return self._arg_parser def __repr__(self) -> str: + """ + Return a string representation of the event. + + :return: A string describing the event. + """ return f"{self.__class__.__name__}({self._arg_parser!r})" class ArgParsedEvent(Event): + """ + Represents the event triggered after command-line arguments have been parsed. + + This event provides access to the parsed arguments. + """ + def __init__(self, args: Namespace) -> None: + """ + Initialize the ArgParsedEvent. + + :param args: The parsed command-line arguments. + """ self._args = args @property def args(self) -> Namespace: + """ + Get the parsed arguments. + + :return: A namespace containing the parsed arguments. + """ return self._args def __repr__(self) -> str: + """ + Return a string representation of the event. + + :return: A string describing the event. + """ return f"{self.__class__.__name__}({self._args!r})" class StartupEvent(Event): + """ + Represents the event triggered at the beginning of the test execution. + + This event provides access to the scenario scheduler. + """ + def __init__(self, scheduler: ScenarioScheduler) -> None: + """ + Initialize the StartupEvent. + + :param scheduler: The scheduler used to manage scenarios. + """ self._scheduler = scheduler @property def scheduler(self) -> ScenarioScheduler: + """ + Get the scenario scheduler. + + :return: The scenario scheduler instance. + """ return self._scheduler @property def scenarios(self) -> List[VirtualScenario]: + """ + Get the list of discovered scenarios. + + :return: A list of virtual scenarios. + """ warnings.warn("Deprecated: use scheduler.discovered instead", DeprecationWarning) return list(self._scheduler.discovered) def __repr__(self) -> str: + """ + Return a string representation of the event. + + :return: A string describing the event. + """ return f"{self.__class__.__name__}({self._scheduler!r})" class _ScenarioEvent(Event): + """ + Base class for events related to scenarios. + + This event provides access to the scenario result. + """ + def __init__(self, scenario_result: ScenarioResult) -> None: + """ + Initialize the _ScenarioEvent. + + :param scenario_result: The result of the scenario. + """ self._scenario_result = scenario_result @property def scenario_result(self) -> ScenarioResult: + """ + Get the scenario result. + + :return: The scenario result instance. + """ return self._scenario_result def __repr__(self) -> str: + """ + Return a string representation of the event. + + :return: A string describing the event. + """ return f"{self.__class__.__name__}({self._scenario_result!r})" class ScenarioSkippedEvent(_ScenarioEvent): + """ + Represents the event triggered when a scenario is skipped. + """ pass class ScenarioFailedEvent(_ScenarioEvent): + """ + Represents the event triggered when a scenario fails. + """ pass class ScenarioRunEvent(_ScenarioEvent): + """ + Represents the event triggered when a scenario starts running. + """ pass class ScenarioPassedEvent(_ScenarioEvent): + """ + Represents the event triggered when a scenario passes. + """ pass class _StepEvent(Event): + """ + Base class for events related to steps. + + This event provides access to the step result. + """ + def __init__(self, step_result: StepResult) -> None: + """ + Initialize the _StepEvent. + + :param step_result: The result of the step. + """ self._step_result = step_result @property def step_result(self) -> StepResult: + """ + Get the step result. + + :return: The step result instance. + """ return self._step_result def __repr__(self) -> str: + """ + Return a string representation of the event. + + :return: A string describing the event. + """ return f"{self.__class__.__name__}({self._step_result!r})" class StepRunEvent(_StepEvent): + """ + Represents the event triggered when a step starts running. + """ pass class StepFailedEvent(_StepEvent): + """ + Represents the event triggered when a step fails. + """ pass class StepPassedEvent(_StepEvent): + """ + Represents the event triggered when a step passes. + """ pass class ExceptionRaisedEvent(Event): + """ + Represents the event triggered when an exception is raised. + + This event provides access to exception information. + """ + def __init__(self, exc_info: ExcInfo) -> None: + """ + Initialize the ExceptionRaisedEvent. + + :param exc_info: The exception information object. + """ self._exc_info = exc_info @property def exc_info(self) -> ExcInfo: + """ + Get the exception information. + + :return: The exception information object. + """ return self._exc_info def __repr__(self) -> str: + """ + Return a string representation of the event. + + :return: A string describing the event. + """ return f"{self.__class__.__name__}({self._exc_info!r})" class ScenarioReportedEvent(Event): + """ + Represents the event triggered after a scenario is reported. + + This event provides access to the aggregated result of the scenario. + """ + def __init__(self, aggregated_result: AggregatedResult) -> None: + """ + Initialize the ScenarioReportedEvent. + + :param aggregated_result: The aggregated result of the scenario. + """ self._aggregated_result = aggregated_result @property def aggregated_result(self) -> AggregatedResult: + """ + Get the aggregated result of the scenario. + + :return: The aggregated result instance. + """ return self._aggregated_result def __repr__(self) -> str: - return f"{self.__class__.__name__}({self._aggregated_result!r}" + """ + Return a string representation of the event. + + :return: A string describing the event. + """ + return f"{self.__class__.__name__}({self._aggregated_result!r})" class CleanupEvent(Event): + """ + Represents the event triggered at the end of test execution. + + This event provides access to the test execution report. + """ + def __init__(self, report: Report) -> None: + """ + Initialize the CleanupEvent. + + :param report: The test execution report. + """ self._report = report @property def report(self) -> Report: + """ + Get the test execution report. + + :return: The report instance. + """ return self._report def __repr__(self) -> str: + """ + Return a string representation of the event. + + :return: A string describing the event. + """ return f"{self.__class__.__name__}({self._report!r})" -__all__ = ("Event", "ConfigLoadedEvent", "ArgParseEvent", "ArgParsedEvent", - "StartupEvent", "ScenarioRunEvent", "ScenarioSkippedEvent", - "ScenarioFailedEvent", "ScenarioPassedEvent", - "StepRunEvent", "StepFailedEvent", "StepPassedEvent", "ExceptionRaisedEvent", - "ScenarioReportedEvent", "CleanupEvent",) +__all__ = ( + "Event", "ConfigLoadedEvent", "ArgParseEvent", "ArgParsedEvent", + "StartupEvent", "ScenarioRunEvent", "ScenarioSkippedEvent", + "ScenarioFailedEvent", "ScenarioPassedEvent", + "StepRunEvent", "StepFailedEvent", "StepPassedEvent", "ExceptionRaisedEvent", + "ScenarioReportedEvent", "CleanupEvent", +)