From ead74d60234f07208fc66ac9651e95f34756be37 Mon Sep 17 00:00:00 2001 From: Teddy Crepineau Date: Thu, 6 Feb 2025 10:39:09 +0100 Subject: [PATCH 1/2] fix: handle starburst JWToken expiration --- .../ingestion/source/database/trino/connection.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/connection.py b/ingestion/src/metadata/ingestion/source/database/trino/connection.py index 6dbae4ac9c39..b41231aaffa8 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/connection.py @@ -13,7 +13,7 @@ Source connection handler """ from copy import deepcopy -from typing import Optional +from typing import Literal, Optional from urllib.parse import quote_plus from requests import Session @@ -135,6 +135,9 @@ def get_connection(connection: TrinoConnection) -> Engine: # here we are creating a copy of connection, because we need to dynamically # add auth params to connectionArguments, which we do no intend to store # in original connection object and in OpenMetadata database + from trino.sqlalchemy.dialect import TrinoDialect + TrinoDialect.is_disconnect = _is_disconnect + connection_copy = deepcopy(connection) if connection_copy.verify: connection_copy.connectionArguments = ( @@ -183,3 +186,10 @@ def test_connection( queries=queries, timeout_seconds=timeout_seconds, ) + +# pylint: disable=unused-argument +def _is_disconnect(self, e, connection, cursor): + """is_disconnect method for the Databricks dialect""" + if "JWT expired" in str(e): + return True + return False From a7166bdd6b1084fffda337d64ce2c3a674635a58 Mon Sep 17 00:00:00 2001 From: Teddy Crepineau Date: Thu, 6 Feb 2025 10:41:24 +0100 Subject: [PATCH 2/2] style: ran python linting --- .../metadata/ingestion/source/database/trino/connection.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/connection.py b/ingestion/src/metadata/ingestion/source/database/trino/connection.py index b41231aaffa8..ce9f2d0cb645 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/connection.py @@ -13,7 +13,7 @@ Source connection handler """ from copy import deepcopy -from typing import Literal, Optional +from typing import Optional from urllib.parse import quote_plus from requests import Session @@ -136,6 +136,7 @@ def get_connection(connection: TrinoConnection) -> Engine: # add auth params to connectionArguments, which we do no intend to store # in original connection object and in OpenMetadata database from trino.sqlalchemy.dialect import TrinoDialect + TrinoDialect.is_disconnect = _is_disconnect connection_copy = deepcopy(connection) @@ -187,6 +188,7 @@ def test_connection( timeout_seconds=timeout_seconds, ) + # pylint: disable=unused-argument def _is_disconnect(self, e, connection, cursor): """is_disconnect method for the Databricks dialect"""