From 50d6f2143dbad6ca513c2617bdd836ee7f6af607 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Wed, 4 Dec 2024 21:00:04 -0500 Subject: [PATCH] Try to fix jinja memory leak --- .../declarative/interpolation/jinja.py | 69 ++++++++++--------- .../sources/streams/http/http_client.py | 1 - 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/airbyte_cdk/sources/declarative/interpolation/jinja.py b/airbyte_cdk/sources/declarative/interpolation/jinja.py index ecbe9a349..f1f126e91 100644 --- a/airbyte_cdk/sources/declarative/interpolation/jinja.py +++ b/airbyte_cdk/sources/declarative/interpolation/jinja.py @@ -4,7 +4,7 @@ import ast from functools import cache -from typing import Any, Mapping, Optional, Tuple, Type +from typing import Any, Mapping, Optional, Set, Tuple, Type from jinja2 import meta from jinja2.environment import Template @@ -30,6 +30,34 @@ def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool: return super().is_safe_attribute(obj, attr, value) # type: ignore # for some reason, mypy says 'Returning Any from function declared to return "bool"' +# These aliases are used to deprecate existing keywords without breaking all existing connectors. +_ALIASES = { + "stream_interval": "stream_slice", # Use stream_interval to access incremental_sync values + "stream_partition": "stream_slice", # Use stream_partition to access partition router's values +} + +# These extensions are not installed so they're not currently a problem, +# but we're still explicitely removing them from the jinja context. +# At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks +_RESTRICTED_EXTENSIONS = ["jinja2.ext.loopcontrols"] # Adds support for break continue in loops + +# By default, these Python builtin functions are available in the Jinja context. +# We explicitely remove them because of the potential security risk. +# Please add a unit test to test_jinja.py when adding a restriction. +_RESTRICTED_BUILTIN_FUNCTIONS = [ + "range" +] # The range function can cause very expensive computations + +_ENVIRONMENT = StreamPartitionAccessEnvironment() +_ENVIRONMENT.filters.update(**filters) +_ENVIRONMENT.globals.update(**macros) + +for extension in _RESTRICTED_EXTENSIONS: + _ENVIRONMENT.extensions.pop(extension, None) +for builtin in _RESTRICTED_BUILTIN_FUNCTIONS: + _ENVIRONMENT.globals.pop(builtin, None) + + class JinjaInterpolation(Interpolation): """ Interpolation strategy using the Jinja2 template engine. @@ -48,34 +76,6 @@ class JinjaInterpolation(Interpolation): Additional information on jinja templating can be found at https://jinja.palletsprojects.com/en/3.1.x/templates/# """ - # These aliases are used to deprecate existing keywords without breaking all existing connectors. - ALIASES = { - "stream_interval": "stream_slice", # Use stream_interval to access incremental_sync values - "stream_partition": "stream_slice", # Use stream_partition to access partition router's values - } - - # These extensions are not installed so they're not currently a problem, - # but we're still explicitely removing them from the jinja context. - # At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks - RESTRICTED_EXTENSIONS = ["jinja2.ext.loopcontrols"] # Adds support for break continue in loops - - # By default, these Python builtin functions are available in the Jinja context. - # We explicitely remove them because of the potential security risk. - # Please add a unit test to test_jinja.py when adding a restriction. - RESTRICTED_BUILTIN_FUNCTIONS = [ - "range" - ] # The range function can cause very expensive computations - - def __init__(self) -> None: - self._environment = StreamPartitionAccessEnvironment() - self._environment.filters.update(**filters) - self._environment.globals.update(**macros) - - for extension in self.RESTRICTED_EXTENSIONS: - self._environment.extensions.pop(extension, None) - for builtin in self.RESTRICTED_BUILTIN_FUNCTIONS: - self._environment.globals.pop(builtin, None) - def eval( self, input_str: str, @@ -86,7 +86,7 @@ def eval( ) -> Any: context = {"config": config, **additional_parameters} - for alias, equivalent in self.ALIASES.items(): + for alias, equivalent in _ALIASES.items(): if alias in context: # This is unexpected. We could ignore or log a warning, but failing loudly should result in fewer surprises raise ValueError( @@ -105,6 +105,7 @@ def eval( raise Exception(f"Expected a string, got {input_str}") except UndefinedError: pass + # If result is empty or resulted in an undefined error, evaluate and return the default string return self._literal_eval(self._eval(default, context), valid_types) @@ -132,16 +133,16 @@ def _eval(self, s: Optional[str], context: Mapping[str, Any]) -> Optional[str]: return s @cache - def _find_undeclared_variables(self, s: Optional[str]) -> Template: + def _find_undeclared_variables(self, s: Optional[str]) -> Set[str]: """ Find undeclared variables and cache them """ - ast = self._environment.parse(s) # type: ignore # parse is able to handle None + ast = _ENVIRONMENT.parse(s) # type: ignore # parse is able to handle None return meta.find_undeclared_variables(ast) @cache - def _compile(self, s: Optional[str]) -> Template: + def _compile(self, s: str) -> Template: """ We must cache the Jinja Template ourselves because we're using `from_string` instead of a template loader """ - return self._environment.from_string(s) + return _ENVIRONMENT.from_string(s) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index e2737457f..14b1cdcf7 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -538,7 +538,6 @@ def _write(self, key: str, value: str) -> None: logger.warning(f"Error while saving item to cache: {exception}") - class SkipFailureSQLiteCache(requests_cache.backends.sqlite.SQLiteCache): def __init__( # type: ignore # ignoring as lib is not typed self,