Skip to content

Commit

Permalink
Fix multiple chained operands parsing in logical expressions (#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsv1 authored Dec 20, 2024
1 parent 87297ba commit b75b1c7
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 34 deletions.
4 changes: 2 additions & 2 deletions tests/plugins/rerunner/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def test_aggregate_nothing(scheduler: Scheduler):
lambda: [make_scenario_result().mark_failed(), make_scenario_result().mark_failed()],
lambda: [make_scenario_result().mark_skipped(), make_scenario_result().mark_skipped()],
])
def test_aggreate_results(get_scenario_results: Callable[[], List[ScenarioResult]], *,
scheduler: Scheduler):
def test_aggregate_results(get_scenario_results: Callable[[], List[ScenarioResult]], *,
scheduler: Scheduler):
with when:
scenario_results = get_scenario_results()
aggregated_result = scheduler.aggregate_results(scenario_results)
Expand Down
47 changes: 47 additions & 0 deletions tests/plugins/tagger/test_tagger_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,30 @@ async def test_tag_and_operator(*, dispatcher: Dispatcher):
assert list(scheduler.scheduled) == [scenarios[2]]


@pytest.mark.usefixtures(tagger.__name__)
async def test_tag_and_operators(*, dispatcher: Dispatcher):
with given:
await fire_arg_parsed_event(dispatcher, tags="SMOKE and P0 and API")

scenarios = [
make_vscenario(tags=["SMOKE"]),
make_vscenario(tags=["SMOKE", "P0"]),
make_vscenario(tags=["P0", "API"]),
make_vscenario(tags=["SMOKE", "API"]),
make_vscenario(tags=["SMOKE", "P0", "API"]),
make_vscenario(tags=["API"]),
make_vscenario(),
]
scheduler = Scheduler(scenarios)
startup_event = StartupEvent(scheduler)

with when:
await dispatcher.fire(startup_event)

with then:
assert list(scheduler.scheduled) == [scenarios[4]]


@pytest.mark.usefixtures(tagger.__name__)
async def test_tag_or_operator(*, dispatcher: Dispatcher):
with given:
Expand All @@ -134,6 +158,29 @@ async def test_tag_or_operator(*, dispatcher: Dispatcher):
assert list(scheduler.scheduled) == [scenarios[0], scenarios[2]]


@pytest.mark.usefixtures(tagger.__name__)
async def test_tag_or_operators(*, dispatcher: Dispatcher):
with given:
await fire_arg_parsed_event(dispatcher, tags="SMOKE or P0 or API")

scenarios = [
make_vscenario(),
make_vscenario(tags=["SMOKE"]),
make_vscenario(tags=["P0"]),
make_vscenario(tags=["SMOKE", "P0"]),
make_vscenario(tags=["API"]),
make_vscenario(tags=["SMOKE", "P0", "API"]),
]
scheduler = Scheduler(scenarios)
startup_event = StartupEvent(scheduler)

with when:
await dispatcher.fire(startup_event)

with then:
assert list(scheduler.scheduled) == scenarios[1:]


@pytest.mark.usefixtures(tagger.__name__)
async def test_tags_expr(*, dispatcher: Dispatcher):
with given:
Expand Down
2 changes: 1 addition & 1 deletion vedro/plugins/repeater/_repeater.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ async def on_scenario_end(self,
"""
if not self._is_repeating_enabled():
return
assert isinstance(self._scheduler, RepeaterScenarioScheduler) # for type checking
assert isinstance(self._scheduler, ScenarioScheduler) # for type checking

scenario = event.scenario_result.scenario
if scenario.unique_id != self._repeat_scenario_id:
Expand Down
2 changes: 1 addition & 1 deletion vedro/plugins/rerunner/_rerunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ async def on_scenario_end(self,
"""
if not self._is_rerunning_enabled():
return
assert isinstance(self._scheduler, RerunnerScenarioScheduler) # for type checking
assert isinstance(self._scheduler, ScenarioScheduler) # for type checking

scenario = event.scenario_result.scenario
if scenario.unique_id != self._rerun_scenario_id:
Expand Down
83 changes: 53 additions & 30 deletions vedro/plugins/tagger/logic_tag_matcher/_logic_tag_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ class LogicTagMatcher(TagMatcher):
Implements a tag matcher that evaluates logical expressions using the `And`, `Or`,
and `Not` operators.
This class parses a logical expression string consisting of operands (tags) and logical
operators, and evaluates whether a given set of tags satisfies the parsed expression.
Tags are checked for membership in the set, and logical operators are applied according to
standard boolean logic.
This class takes a logical expression string consisting of tag operands and logical
operators, parses it into an expression tree, and then evaluates whether a given set of tags
satisfies the parsed expression. Tags are checked for membership in the input set, and
logical operators (and, or, not) are applied according to standard Boolean logic.
"""

tag_pattern = r'(?!and$|or$|not$)[A-Za-z_][A-Za-z0-9_]*'
Expand All @@ -30,8 +30,9 @@ def __init__(self, expr: str) -> None:
"""
Initialize the LogicTagMatcher with a logical expression string.
:param expr: The logical expression string to parse and evaluate.
The expression can contain tag names, 'and', 'or', 'not' logical operators.
:param expr: The logical expression to parse, which may include tags and the
logical operators 'and', 'or', and 'not'. For example:
"(API and not CLI) or UI".
"""
super().__init__(expr)
operand = Regex(self.tag_pattern, re.IGNORECASE).setParseAction(self._create_tag)
Expand All @@ -44,24 +45,27 @@ def __init__(self, expr: str) -> None:

def match(self, tags: Set[str]) -> bool:
"""
Match the provided set of tags against the parsed logical expression.
Evaluate the parsed expression against a set of tags.
:param tags: A set of strings representing tags to evaluate.
:return: True if the set of tags satisfies the logical expression, False otherwise.
:param tags: A set of strings representing tags to be tested against the expression.
:return: True if the expression evaluates to True given the tags, False otherwise.
"""
if self._grammar is None:
self._grammar = self._parse(self._parser, self._expr)
return self._grammar(tags)

def validate(self, tag: str) -> bool:
"""
Validate whether a tag conforms to the allowed tag pattern.
Validate that a given tag name meets the required criteria:
- Must be a string
- Must match the pattern: start with a letter or underscore, followed by letters, digits,
or underscores.
- Must not be a reserved keyword ('and', 'or', 'not').
:param tag: The tag to validate.
:return: True if the tag is valid, raises an exception otherwise.
:return: True if the tag is valid.
:raises TypeError: If the tag is not a string.
:raises ValueError: If the tag does not match the required pattern, or if it is a
reserved keyword.
:raises ValueError: If the tag is invalid or a reserved keyword.
"""
if not isinstance(tag, str):
raise TypeError(f"Tag must be a str, got {type(tag)}")
Expand All @@ -78,10 +82,13 @@ def _create_tag(self, orig: str, location: int, tokens: ParseResults) -> Expr:
"""
Create an Operand expression from a parsed tag.
This method is called when the parser recognizes a valid tag. It returns an Operand
that checks for membership of the tag in a given set.
:param orig: The original input string (unused).
:param location: The location of the match in the string (unused).
:param tokens: The parsed tokens, where the first token is the tag.
:return: An Operand instance representing the parsed tag.
:param tokens: The parsed tokens, where the first token is the tag name.
:return: An Operand instance for the parsed tag.
"""
tag = tokens[0]
return Operand(tag)
Expand All @@ -90,9 +97,12 @@ def _create_not(self, orig: str, location: int, tokens: ParseResults) -> Expr:
"""
Create a Not operator expression from parsed tokens.
The 'not' operator is unary, so this method is called when the parser recognizes a
pattern like "not <operand>". It returns a Not instance that negates the operand's result.
:param orig: The original input string (unused).
:param location: The location of the match in the string (unused).
:param tokens: The parsed tokens, where the operand to negate is in the tokens.
:param tokens: The parsed tokens, which will contain a single operand to be negated.
:return: A Not instance representing the negation of the operand.
"""
operand = tokens[0][-1]
Expand All @@ -102,37 +112,50 @@ def _create_and(self, orig: str, location: int, tokens: ParseResults) -> Expr:
"""
Create an And operator expression from parsed tokens.
For chained 'and' operations, the parser may return multiple operands. For example,
parsing "A and B and C" may produce tokens like [[A, 'and', B, 'and', C]].
This method extracts all operands and folds them from left to right into nested And
expressions:
"A and B and C" -> And(And(A, B), C)
:param orig: The original input string (unused).
:param location: The location of the match in the string (unused).
:param tokens: The parsed tokens, where the first token is the left operand and
the last token is the right operand.
:return: An And instance representing the logical AND of the two operands.
:param tokens: The parsed tokens, which may include multiple operands and 'and' operators.
:return: A nested And expression representing the logical AND of all operands.
"""
left = tokens[0][0]
right = tokens[0][-1]
return And(left, right)
exprs = tokens[0][::2]
result = exprs[0]
for e in exprs[1:]:
result = And(result, e)
return cast(Expr, result)

def _create_or(self, orig: str, location: int, tokens: ParseResults) -> Expr:
"""
Create an Or operator expression from parsed tokens.
Similar to the 'and' operator handling, multiple operands can be chained with 'or'.
For example, "A or B or C" may yield tokens [[A, 'or', B, 'or', C]].
This method extracts all operands and folds them into nested Or expressions:
"A or B or C" -> Or(Or(A, B), C)
:param orig: The original input string (unused).
:param location: The location of the match in the string (unused).
:param tokens: The parsed tokens, where the first token is the left operand and
the last token is the right operand.
:return: An Or instance representing the logical OR of the two operands.
:param tokens: The parsed tokens, which may include multiple operands and 'or' operators.
:return: A nested Or expression representing the logical OR of all operands.
"""
left = tokens[0][0]
right = tokens[0][-1]
return Or(left, right)
exprs = tokens[0][::2]
result = exprs[0]
for e in exprs[1:]:
result = Or(result, e)
return cast(Expr, result)

def _parse(self, grammar: Parser, expr: str) -> Expr:
"""
Parse the provided logical expression using the grammar.
Parse the provided logical expression using the defined grammar.
:param grammar: The parser grammar to use for parsing the expression.
:param expr: The logical expression string to parse.
:return: An Expr instance representing the parsed logical expression.
:return: An Expr instance representing the parsed logical expression tree.
:raises ValueError: If the expression is invalid or cannot be parsed.
"""
try:
Expand Down

0 comments on commit b75b1c7

Please sign in to comment.