Skip to content

Commit

Permalink
refactor!(server/pipeline): raise UndefinedVariableError when variabl…
Browse files Browse the repository at this point in the history
…es are missing during rendering [TCTC-10013] (#2361)

* fix(toucan_connectors): get UndefinedVariableErrors instead of validation errors

* fix: imports

* Update server/tests/test_pipeline.py

Co-authored-by: Luka Peschke <[email protected]>

* doc: changelog

* Update server/CHANGELOG.md

Co-authored-by: Luka Peschke <[email protected]>

---------

Co-authored-by: Luka Peschke <[email protected]>
  • Loading branch information
Fanaen and lukapeschke authored Mar 3, 2025
1 parent 3c84674 commit 22fea2e
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 32 deletions.
3 changes: 3 additions & 0 deletions server/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

### Changed
- Missing variables during rendering now cause a `UndefinedVariableError` to be raised instead of a `ValidationError`.

## [0.51.2] - 2025-02-20

### Fixed
Expand Down
11 changes: 11 additions & 0 deletions server/src/weaverbird/utils/toucan_connectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from toucan_connectors.common import nosql_apply_parameters_to_query


def nosql_apply_parameters_to_query_with_errors(
query: dict | list[dict] | tuple | str, parameters: dict | None, **kwargs
):
"""
When a variable is missing, it may lead to undefined values and therefore pydantic validation errors.
Instead, we want UndefinedVariableError so we force `handle_errors=True` which does exactly this.
"""
return nosql_apply_parameters_to_query(query, parameters, handle_errors=True, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import pytest
from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, is_numeric_dtype
from pymongo import MongoClient
from toucan_connectors.common import nosql_apply_parameters_to_query

from tests.utils import assert_dataframes_content_equals, get_spec_from_json_fixture, retrieve_case
from weaverbird.backends.mongo_translator.mongo_pipeline_translator import translate_pipeline
from weaverbird.pipeline import PipelineWithVariables
from weaverbird.utils.toucan_connectors import nosql_apply_parameters_to_query_with_errors

exec_type = "mongo"
test_cases = retrieve_case("mongo_translator", exec_type)
Expand Down Expand Up @@ -96,7 +96,9 @@ def test_mongo_translator_pipeline(mongo_database, case_id, case_spec_file_path,

# create query
steps = spec["step"]["pipeline"]
pipeline = PipelineWithVariables(steps=steps).render(available_variables, nosql_apply_parameters_to_query)
pipeline = PipelineWithVariables(steps=steps).render(
available_variables, nosql_apply_parameters_to_query_with_errors
)
query = translate_pipeline(pipeline)
# execute query
result = list(mongo_database[collection_uid].aggregate(query))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import geopandas as gpd
import pandas as pd
import pytest
from toucan_connectors.common import nosql_apply_parameters_to_query

from tests.utils import assert_dataframes_equals, get_spec_from_json_fixture, retrieve_case
from weaverbird.backends.pandas_executor import execute_pipeline
from weaverbird.pipeline import PipelineWithVariables
from weaverbird.utils.toucan_connectors import nosql_apply_parameters_to_query_with_errors

test_cases = retrieve_case("pandas_executor", "pandas")

Expand All @@ -30,7 +30,9 @@ def test_pandas_execute_pipeline(case_id, case_spec_file_path, available_variabl

steps = spec["step"]["pipeline"]
steps.insert(0, {"name": "domain", "domain": "in"})
pipeline = PipelineWithVariables(steps=steps).render(available_variables, nosql_apply_parameters_to_query)
pipeline = PipelineWithVariables(steps=steps).render(
available_variables, nosql_apply_parameters_to_query_with_errors
)

domains = {"in": df_in, **dfs_in_others}
result = execute_pipeline(pipeline, domain_retriever=lambda x: domains[x])[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import pandas as pd
import pytest
from boto3 import Session
from toucan_connectors.common import nosql_apply_parameters_to_query

from tests.utils import assert_dataframes_equals, get_spec_from_json_fixture, retrieve_case
from weaverbird.backends.pypika_translator.dialects import SQLDialect
from weaverbird.backends.pypika_translator.translate import translate_pipeline
from weaverbird.pipeline import PipelineWithVariables
from weaverbird.utils.toucan_connectors import nosql_apply_parameters_to_query_with_errors

_REGION = environ["ATHENA_REGION"]
_DB = environ["ATHENA_DATABASE"]
Expand Down Expand Up @@ -57,7 +57,9 @@ def test_athena_translator_pipeline(
pipeline_spec = get_spec_from_json_fixture(case_id, case_spec_file)

steps = [{"name": "domain", "domain": "beers_tiny"}] + pipeline_spec["step"]["pipeline"]
pipeline = PipelineWithVariables(steps=steps).render(available_variables, nosql_apply_parameters_to_query)
pipeline = PipelineWithVariables(steps=steps).render(
available_variables, nosql_apply_parameters_to_query_with_errors
)

query = translate_pipeline(
sql_dialect=SQLDialect.ATHENA,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from google.cloud.bigquery import Client
from google.oauth2 import service_account
from pandas.api.types import is_datetime64_any_dtype
from toucan_connectors.common import nosql_apply_parameters_to_query

from tests.utils import (
_BEERS_TABLE_COLUMNS,
Expand All @@ -18,6 +17,7 @@
from weaverbird.backends.pypika_translator.dialects import SQLDialect
from weaverbird.backends.pypika_translator.translate import translate_pipeline
from weaverbird.pipeline import PipelineWithVariables
from weaverbird.utils.toucan_connectors import nosql_apply_parameters_to_query_with_errors

credentials = service_account.Credentials.from_service_account_info(
info=json.loads(environ["GOOGLE_BIG_QUERY_CREDENTIALS"])
Expand Down Expand Up @@ -51,7 +51,9 @@ def test_bigquery_translator_pipeline(
steps = [{"name": "domain", "domain": "beers_tiny"}] + pipeline_spec["step"]["pipeline"]
table_columns = _BEERS_TABLE_COLUMNS

pipeline = PipelineWithVariables(steps=steps).render(available_variables, nosql_apply_parameters_to_query)
pipeline = PipelineWithVariables(steps=steps).render(
available_variables, nosql_apply_parameters_to_query_with_errors
)

query = translate_pipeline(
sql_dialect=SQLDialect.GOOGLEBIGQUERY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import pytest
from docker.types import Ulimit
from sqlalchemy import create_engine, text
from toucan_connectors.common import nosql_apply_parameters_to_query

from tests.utils import (
_BEERS_TABLE_COLUMNS,
Expand All @@ -21,6 +20,7 @@
from weaverbird.backends.pypika_translator.dialects import SQLDialect
from weaverbird.backends.pypika_translator.translate import translate_pipeline
from weaverbird.pipeline import PipelineWithVariables
from weaverbird.utils.toucan_connectors import nosql_apply_parameters_to_query_with_errors

_CON_PARAMS = {
"host": "127.0.0.1",
Expand Down Expand Up @@ -79,7 +79,9 @@ def test_sql_translator_pipeline(case_id: str, case_spec_file_path: str, engine:

steps = spec["step"]["pipeline"]
steps.insert(0, {"name": "domain", "domain": "beers_tiny"})
pipeline = PipelineWithVariables(steps=steps).render(available_variables, nosql_apply_parameters_to_query)
pipeline = PipelineWithVariables(steps=steps).render(
available_variables, nosql_apply_parameters_to_query_with_errors
)

# Convert Pipeline object to Postgres Query
query = translate_pipeline(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import pytest
from sqlalchemy import create_engine, text
from sqlalchemy.engine.base import OptionEngine
from toucan_connectors.common import nosql_apply_parameters_to_query

from tests.utils import (
_BEERS_TABLE_COLUMNS,
Expand All @@ -21,6 +20,7 @@
from weaverbird.backends.pypika_translator.dialects import SQLDialect
from weaverbird.backends.pypika_translator.translate import translate_pipeline
from weaverbird.pipeline import PipelineWithVariables
from weaverbird.utils.toucan_connectors import nosql_apply_parameters_to_query_with_errors

con_params = {
"host": "127.0.0.1",
Expand Down Expand Up @@ -75,7 +75,9 @@ def test_sql_translator_pipeline(

steps = spec["step"]["pipeline"]
steps.insert(0, {"name": "domain", "domain": "beers_tiny"})
pipeline = PipelineWithVariables(steps=steps).render(available_variables, nosql_apply_parameters_to_query)
pipeline = PipelineWithVariables(steps=steps).render(
available_variables, nosql_apply_parameters_to_query_with_errors
)

# Convert Pipeline object to Postgres Query
query = translate_pipeline(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import pytest
import redshift_connector
from tenacity import retry, stop_after_attempt, wait_fixed
from toucan_connectors.common import nosql_apply_parameters_to_query

from tests.utils import assert_dataframes_equals, get_spec_from_json_fixture, retrieve_case
from weaverbird.backends.pypika_translator.dialects import SQLDialect
from weaverbird.backends.pypika_translator.translate import translate_pipeline
from weaverbird.pipeline import PipelineWithVariables
from weaverbird.utils.toucan_connectors import nosql_apply_parameters_to_query_with_errors

_HOST = environ["REDSHIFT_HOST"]
_USER = environ["REDSHIFT_USER"]
Expand Down Expand Up @@ -51,7 +51,9 @@ def test_redshift_translator_pipeline(
pipeline_spec = get_spec_from_json_fixture(case_id, case_spec_file)

steps = [{"name": "domain", "domain": "beers_tiny"}] + pipeline_spec["step"]["pipeline"]
pipeline = PipelineWithVariables(steps=steps).render(available_variables, nosql_apply_parameters_to_query)
pipeline = PipelineWithVariables(steps=steps).render(
available_variables, nosql_apply_parameters_to_query_with_errors
)

query = translate_pipeline(
sql_dialect=SQLDialect.REDSHIFT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import pytest
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine, text
from toucan_connectors.common import nosql_apply_parameters_to_query

from tests.utils import assert_dataframes_equals, get_spec_from_json_fixture, retrieve_case
from weaverbird.backends.pypika_translator.dialects import SQLDialect
from weaverbird.backends.pypika_translator.translate import translate_pipeline
from weaverbird.pipeline import PipelineWithVariables
from weaverbird.utils.toucan_connectors import nosql_apply_parameters_to_query_with_errors

_ACCOUNT = "toucantocopartner.west-europe.azure"
_USER = "toucan_test"
Expand Down Expand Up @@ -57,7 +57,9 @@ def test_snowflake_translator_pipeline(engine: Any, case_id: str, case_spec_file
pipeline_spec = get_spec_from_json_fixture(case_id, case_spec_file)

steps = [{"name": "domain", "domain": "beers_tiny"}] + pipeline_spec["step"]["pipeline"]
pipeline = PipelineWithVariables(steps=steps).render(available_variables, nosql_apply_parameters_to_query)
pipeline = PipelineWithVariables(steps=steps).render(
available_variables, nosql_apply_parameters_to_query_with_errors
)

query = translate_pipeline(
sql_dialect=SQLDialect.SNOWFLAKE,
Expand Down
16 changes: 6 additions & 10 deletions server/tests/steps/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,14 @@
import pytest
from pandas import DataFrame, read_json
from pandas.testing import assert_frame_equal
from toucan_connectors.common import nosql_apply_parameters_to_query

from tests.utils import assert_dataframes_equals
from weaverbird.backends.pandas_executor.steps.filter import execute_filter
from weaverbird.pipeline.conditions import (
ComparisonCondition,
ConditionComboAnd,
DateBoundCondition,
)
from weaverbird.pipeline.conditions import ComparisonCondition, ConditionComboAnd, DateBoundCondition
from weaverbird.pipeline.dates import RelativeDateWithVariables
from weaverbird.pipeline.steps import FilterStep
from weaverbird.pipeline.steps.filter import FilterStepWithVariables
from weaverbird.utils.toucan_connectors import nosql_apply_parameters_to_query_with_errors


@pytest.fixture
Expand Down Expand Up @@ -439,7 +435,7 @@ def test_render_filter_step_with_variables(available_variables: dict[str, Any])
"operator": "from",
}
)
rendered = step.render(available_variables, nosql_apply_parameters_to_query)
rendered = step.render(available_variables, nosql_apply_parameters_to_query_with_errors)
assert rendered.condition.value == available_variables["TODAY"]

step = FilterStepWithVariables(
Expand All @@ -454,13 +450,13 @@ def test_render_filter_step_with_variables(available_variables: dict[str, Any])
"operator": "from",
}
)
rendered = step.render(available_variables, nosql_apply_parameters_to_query)
rendered = step.render(available_variables, nosql_apply_parameters_to_query_with_errors)
assert rendered.condition.value.date == available_variables["TODAY"]

step = FilterStepWithVariables(condition={"or": [step.model_dump()["condition"]]})
rendered = step.render(available_variables, nosql_apply_parameters_to_query)
rendered = step.render(available_variables, nosql_apply_parameters_to_query_with_errors)
assert rendered.condition.or_[0].value.date == available_variables["TODAY"]

step = FilterStepWithVariables(condition={"column": "int_column", "operator": "in", "value": "{{ INTEGER_LIST }}"})
rendered = step.render(available_variables, nosql_apply_parameters_to_query)
rendered = step.render(available_variables, nosql_apply_parameters_to_query_with_errors)
assert rendered.condition.value == available_variables["INTEGER_LIST"] == [1, 2, 3]
6 changes: 3 additions & 3 deletions server/tests/steps/test_ifthenelse.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import pytest
from pandas import NA, DataFrame
from toucan_connectors.common import nosql_apply_parameters_to_query

from tests.utils import assert_dataframes_equals
from weaverbird.backends.pandas_executor.steps.ifthenelse import execute_ifthenelse
from weaverbird.pipeline.steps.ifthenelse import IfthenelseStep, IfThenElseStepWithVariables
from weaverbird.utils.toucan_connectors import nosql_apply_parameters_to_query_with_errors


@pytest.fixture
Expand Down Expand Up @@ -197,7 +197,7 @@ def test_render_ifthenelsestep_step_with_variables(available_variables: dict[str
"new_column": "coucou",
}
)
rendered = step.render(available_variables, nosql_apply_parameters_to_query)
rendered = step.render(available_variables, nosql_apply_parameters_to_query_with_errors)
assert rendered.condition.value == available_variables["TODAY"]

step = IfThenElseStepWithVariables(
Expand All @@ -218,5 +218,5 @@ def test_render_ifthenelsestep_step_with_variables(available_variables: dict[str
}
)

rendered = step.render(available_variables, nosql_apply_parameters_to_query)
rendered = step.render(available_variables, nosql_apply_parameters_to_query_with_errors)
assert rendered.condition.value.date == available_variables["TODAY"]
20 changes: 17 additions & 3 deletions server/tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest
from jinja2.nativetypes import NativeEnvironment
from pydantic import BaseModel
from toucan_connectors.common import nosql_apply_parameters_to_query
from toucan_connectors.common import UndefinedVariableError

from weaverbird.pipeline.conditions import ComparisonCondition, InclusionCondition
from weaverbird.pipeline.pipeline import (
Expand All @@ -21,6 +21,7 @@
from weaverbird.pipeline.steps.aggregate import Aggregation
from weaverbird.pipeline.steps.append import AppendStep
from weaverbird.pipeline.steps.text import TextStep
from weaverbird.utils.toucan_connectors import nosql_apply_parameters_to_query_with_errors


class Case(BaseModel):
Expand Down Expand Up @@ -64,7 +65,7 @@ def get_render_variables_test_cases():
def test_step_with_variables(case: Case):
pipeline_with_variables = PipelineWithVariables(**case.data)

pipeline = pipeline_with_variables.render(case.context, renderer=nosql_apply_parameters_to_query)
pipeline = pipeline_with_variables.render(case.context, renderer=nosql_apply_parameters_to_query_with_errors)

expected_result = Pipeline(steps=case.expected_result)
assert pipeline == expected_result
Expand All @@ -75,7 +76,7 @@ def test_custom_sql_step_with_variables():
variables = {"__front_var_0__": "-- DROP TABLE users;"}

pipeline_with_variables = PipelineWithVariables(steps=steps)
pipeline = pipeline_with_variables.render(variables, renderer=nosql_apply_parameters_to_query)
pipeline = pipeline_with_variables.render(variables, renderer=nosql_apply_parameters_to_query_with_errors)

# It should not have been rendered:
assert pipeline.steps[0].query == "{{ __front_var_0__ }}"
Expand Down Expand Up @@ -657,3 +658,16 @@ def test_keep_condition_if_empty_list_from_filter_steps(
):
expected_steps = [expected_step] if expected_step is not None else []
assert remove_void_conditions_from_filter_steps([step]) == expected_steps


def test_raises_error_when_variables_are_missing():
with pytest.raises(UndefinedVariableError):
PipelineWithVariables(
steps=[
{"name": "domain", "domain": "beers"},
{
"name": "filter",
"condition": {"column": "price", "value": "{{ __front_var_0__ }}", "operator": "lt"},
},
]
).render({}, nosql_apply_parameters_to_query_with_errors)

0 comments on commit 22fea2e

Please sign in to comment.