From 2676514b9fafd66e8486f1feb5041d6ca6478949 Mon Sep 17 00:00:00 2001 From: Josip Mrden Date: Wed, 19 Jun 2024 10:12:04 +0200 Subject: [PATCH 1/4] Add ibm-db2 migration --- python/migrate.py | 87 +++++++++++++++++++++++++++++++++++ python/requirements.txt | 1 + python/requirements_no_ml.txt | 1 + 3 files changed, 89 insertions(+) diff --git a/python/migrate.py b/python/migrate.py index 3e112dac4..e7ec9cd46 100644 --- a/python/migrate.py +++ b/python/migrate.py @@ -5,6 +5,7 @@ import oracledb import pyodbc import psycopg2 +import ibm_db import threading from typing import Any, Dict @@ -342,6 +343,92 @@ def cleanup_migrate_postgresql(): mgp.add_batch_read_proc(postgresql, init_migrate_postgresql, cleanup_migrate_postgresql) +# IBM DB2 dictionary to store connections and cursors by thread +db2_dict = {} + +def init_migrate_db2( + table_or_sql: str, + config: mgp.Map, + config_path: str = "", + params: mgp.Nullable[mgp.Any] = None, +): + global db2_dict + + if params: + _check_params_type(params) + + if len(config_path) > 0: + config = _combine_config(config=config, config_path=config_path) + + if _query_is_table(table_or_sql): + table_or_sql = f"SELECT * FROM {table_or_sql}" + + if threading.get_native_id not in db2_dict: + db2_dict[threading.get_native_id] = {} + + if Constants.CURSOR not in db2_dict[threading.get_native_id]: + db2_dict[threading.get_native_id][Constants.CURSOR] = None + + if db2_dict[threading.get_native_id][Constants.CURSOR] is None: + connection_string = _create_connection_string(config) + connection = ibm_db.connect(connection_string, "", "") + stmt = ibm_db.prepare(connection, table_or_sql) + + if params: + for i, param in enumerate(params): + ibm_db.bind_param(stmt, i + 1, param) + + ibm_db.execute(stmt) + + db2_dict[threading.get_native_id][Constants.CONNECTION] = connection + db2_dict[threading.get_native_id][Constants.CURSOR] = stmt + db2_dict[threading.get_native_id][Constants.COLUMN_NAMES] = [ + column[Constants.I_COLUMN_NAME] for column in ibm_db.fetch_assoc(stmt) + ] + + +def db2( + table_or_sql: str, + config: mgp.Map, + config_path: str = "", + params: mgp.Nullable[mgp.Any] = None, +) -> mgp.Record(row=mgp.Map): + """ + With migrate.db2 you can access IBM DB2 and execute queries. The result table is converted into a stream, + and returned rows can be used to create or create graph structures. Config must be at least empty map. + If config_path is passed, every key,value pair from JSON file will overwrite any values in config file. + + :param table_or_sql: Table name or an SQL query + :param config: Connection configuration parameters (as in ibm_db.connect), + :param config_path: Path to the JSON file containing configuration parameters (as in ibm_db.connect) + :param params: Optionally, queries may be parameterized. In that case, `params` provides parameter values + :return: The result table as a stream of rows + """ + global db2_dict + cursor = db2_dict[threading.get_native_id][Constants.CURSOR] + column_names = db2_dict[threading.get_native_id][Constants.COLUMN_NAMES] + + rows = [] + for _ in range(Constants.BATCH_SIZE): + row = ibm_db.fetch_assoc(cursor) + if not row: + break + rows.append(row) + + return [mgp.Record(row=_name_row_cells(row, column_names)) for row in rows] + + +def cleanup_migrate_db2(): + global db2_dict + db2_dict[threading.get_native_id][Constants.CURSOR] = None + ibm_db.commit(db2_dict[threading.get_native_id][Constants.CONNECTION]) + ibm_db.close(db2_dict[threading.get_native_id][Constants.CONNECTION]) + db2_dict[threading.get_native_id][Constants.CONNECTION] = None + db2_dict[threading.get_native_id][Constants.COLUMN_NAMES] = None + +mgp.add_batch_read_proc(db2, init_migrate_db2, cleanup_migrate_db2) + + def _query_is_table(table_or_sql: str) -> bool: return len(table_or_sql.split()) == 1 diff --git a/python/requirements.txt b/python/requirements.txt index c64bae23c..199015060 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -13,5 +13,6 @@ mysql-connector-python==8.0.32 oracledb==1.2.2 pyodbc==4.0.35 psycopg2-binary==2.9.9 +ibm-db=3.2.3 defusedxml==0.7.1 scipy==1.12.0 diff --git a/python/requirements_no_ml.txt b/python/requirements_no_ml.txt index 67a1686f6..38dd0be39 100644 --- a/python/requirements_no_ml.txt +++ b/python/requirements_no_ml.txt @@ -11,5 +11,6 @@ mysql-connector-python==8.0.32 oracledb==1.2.2 pyodbc==4.0.35 psycopg2-binary==2.9.9 +ibm-db==3.2.3 defusedxml==0.7.1 scipy==1.12.0 From 8ad186833977a12303113e5d103dfb9099df85b2 Mon Sep 17 00:00:00 2001 From: Josip Mrden Date: Tue, 25 Jun 2024 12:07:57 +0200 Subject: [PATCH 2/4] Fix requirements.txt providing only double equality --- python/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/requirements.txt b/python/requirements.txt index 199015060..54ea70877 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -13,6 +13,6 @@ mysql-connector-python==8.0.32 oracledb==1.2.2 pyodbc==4.0.35 psycopg2-binary==2.9.9 -ibm-db=3.2.3 +ibm-db==3.2.3 defusedxml==0.7.1 scipy==1.12.0 From bd4a7438f66ed91bd6ccc7dd94d605a926853614 Mon Sep 17 00:00:00 2001 From: Josip Mrden Date: Tue, 25 Jun 2024 14:05:06 +0200 Subject: [PATCH 3/4] Flake and migrate --- python/migrate.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/migrate.py b/python/migrate.py index e7ec9cd46..2d5b815f4 100644 --- a/python/migrate.py +++ b/python/migrate.py @@ -346,6 +346,7 @@ def cleanup_migrate_postgresql(): # IBM DB2 dictionary to store connections and cursors by thread db2_dict = {} + def init_migrate_db2( table_or_sql: str, config: mgp.Map, @@ -426,6 +427,7 @@ def cleanup_migrate_db2(): db2_dict[threading.get_native_id][Constants.CONNECTION] = None db2_dict[threading.get_native_id][Constants.COLUMN_NAMES] = None + mgp.add_batch_read_proc(db2, init_migrate_db2, cleanup_migrate_db2) From 9089ab8a9a793e729aefbf3499ae16680fcc00b0 Mon Sep 17 00:00:00 2001 From: Josip Mrden Date: Tue, 25 Jun 2024 17:37:55 +0200 Subject: [PATCH 4/4] Add private method --- python/migrate.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/migrate.py b/python/migrate.py index 2d5b815f4..d3a6e9731 100644 --- a/python/migrate.py +++ b/python/migrate.py @@ -464,3 +464,7 @@ def _check_params_type(params: Any, types=(dict, list, tuple)) -> None: raise TypeError( "Database query parameter values must be passed in a container of type List[Any] (or Map, if migrating from MySQL or Oracle DB)" ) + + +def _create_connection_string(config: mgp.Map) -> str: + return ";".join(f"{key}={value}" for key, value in config.items())