Skip to content

Commit

Permalink
Quote db_name and table_name (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
bakwc authored Jan 23, 2025
1 parent 1932617 commit 174222a
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 122 deletions.
18 changes: 9 additions & 9 deletions mysql_ch_replicator/clickhouse_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


CREATE_TABLE_QUERY = '''
CREATE TABLE {if_not_exists} {db_name}.{table_name}
CREATE TABLE {if_not_exists} `{db_name}`.`{table_name}`
(
{fields},
`_version` UInt64,
Expand All @@ -26,7 +26,7 @@
'''

DELETE_QUERY = '''
DELETE FROM {db_name}.{table_name} WHERE ({field_name}) IN ({field_values})
DELETE FROM `{db_name}`.`{table_name}` WHERE ({field_name}) IN ({field_values})
'''


Expand Down Expand Up @@ -126,8 +126,8 @@ def execute_command(self, query):
time.sleep(ClickhouseApi.RETRY_INTERVAL)

def recreate_database(self):
self.execute_command(f'DROP DATABASE IF EXISTS {self.database}')
self.execute_command(f'CREATE DATABASE {self.database}')
self.execute_command(f'DROP DATABASE IF EXISTS `{self.database}`')
self.execute_command(f'CREATE DATABASE `{self.database}`')

def get_last_used_version(self, table_name):
return self.tables_last_record_version.get(table_name, 0)
Expand Down Expand Up @@ -210,9 +210,9 @@ def insert(self, table_name, records, table_structure: TableStructure = None):
records_to_insert.append(tuple(record) + (current_version,))
current_version += 1

full_table_name = table_name
full_table_name = f'`table_name`'
if '.' not in full_table_name:
full_table_name = f'{self.database}.{table_name}'
full_table_name = f'`{self.database}`.`{table_name}`'

duration = 0.0
for attempt in range(ClickhouseApi.MAX_RETRIES):
Expand Down Expand Up @@ -258,10 +258,10 @@ def erase(self, table_name, field_name, field_values):
)

def drop_database(self, db_name):
self.execute_command(f'DROP DATABASE IF EXISTS {db_name}')
self.execute_command(f'DROP DATABASE IF EXISTS `{db_name}`')

def create_database(self, db_name):
self.cursor.execute(f'CREATE DATABASE {db_name}')
self.cursor.execute(f'CREATE DATABASE `{db_name}`')

def select(self, table_name, where=None, final=None):
query = f'SELECT * FROM {table_name}'
Expand All @@ -282,7 +282,7 @@ def query(self, query: str):
return self.client.query(query)

def show_create_table(self, table_name):
return self.client.query(f'SHOW CREATE TABLE {table_name}').result_rows[0][0]
return self.client.query(f'SHOW CREATE TABLE `{table_name}`').result_rows[0][0]

def get_system_setting(self, name):
results = self.select('system.settings', f"name = '{name}'")
Expand Down
10 changes: 5 additions & 5 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ def __convert_alter_table_add_column(self, db_name, table_name, tokens):
column_after,
)

query = f'ALTER TABLE {db_name}.{table_name} ADD COLUMN {column_name} {column_type_ch}'
query = f'ALTER TABLE `{db_name}`.`{table_name}` ADD COLUMN `{column_name}` {column_type_ch}'
if column_first:
query += ' FIRST'
else:
Expand All @@ -525,7 +525,7 @@ def __convert_alter_table_drop_column(self, db_name, table_name, tokens):
mysql_table_structure.remove_field(field_name=column_name)
ch_table_structure.remove_field(field_name=column_name)

query = f'ALTER TABLE {db_name}.{table_name} DROP COLUMN {column_name}'
query = f'ALTER TABLE `{db_name}`.`{table_name}` DROP COLUMN {column_name}'
if self.db_replicator:
self.db_replicator.clickhouse_api.execute_command(query)

Expand Down Expand Up @@ -556,7 +556,7 @@ def __convert_alter_table_modify_column(self, db_name, table_name, tokens):
TableField(name=column_name, field_type=column_type_ch),
)

query = f'ALTER TABLE {db_name}.{table_name} MODIFY COLUMN {column_name} {column_type_ch}'
query = f'ALTER TABLE `{db_name}`.`{table_name}` MODIFY COLUMN `{column_name}` {column_type_ch}'
if self.db_replicator:
self.db_replicator.clickhouse_api.execute_command(query)

Expand Down Expand Up @@ -592,7 +592,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
TableField(name=column_name, field_type=column_type_ch),
)

query = f'ALTER TABLE {db_name}.{table_name} MODIFY COLUMN {column_name} {column_type_ch}'
query = f'ALTER TABLE `{db_name}`.`{table_name}` MODIFY COLUMN {column_name} {column_type_ch}'
self.db_replicator.clickhouse_api.execute_command(query)

if column_name != new_column_name:
Expand All @@ -602,7 +602,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
curr_field_mysql.name = new_column_name
curr_field_clickhouse.name = new_column_name

query = f'ALTER TABLE {db_name}.{table_name} RENAME COLUMN {column_name} TO {new_column_name}'
query = f'ALTER TABLE `{db_name}`.`{table_name}` RENAME COLUMN {column_name} TO {new_column_name}'
self.db_replicator.clickhouse_api.execute_command(query)

def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableStructure]:
Expand Down
2 changes: 1 addition & 1 deletion mysql_ch_replicator/db_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def optimize_table(self, db_name, table_name):
logger.info(f'Optimizing table {db_name}.{table_name}')
t1 = time.time()
self.clickhouse_api.execute_command(
f'OPTIMIZE TABLE {db_name}.{table_name} FINAL SETTINGS mutations_sync = 2'
f'OPTIMIZE TABLE `{db_name}`.`{table_name}` FINAL SETTINGS mutations_sync = 2'
)
t2 = time.time()
logger.info(f'Optimize finished in {int(t2-t1)} seconds')
Expand Down
10 changes: 5 additions & 5 deletions mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,15 @@ def perform_initial_replication(self):
logger.info(f'initial replication - swapping database')
if self.target_database in self.clickhouse_api.get_databases():
self.clickhouse_api.execute_command(
f'RENAME DATABASE {self.target_database} TO {self.target_database}_old',
f'RENAME DATABASE `{self.target_database}` TO `{self.target_database}_old`',
)
self.clickhouse_api.execute_command(
f'RENAME DATABASE {self.target_database_tmp} TO {self.target_database}',
f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`',
)
self.clickhouse_api.drop_database(f'{self.target_database}_old')
else:
self.clickhouse_api.execute_command(
f'RENAME DATABASE {self.target_database_tmp} TO {self.target_database}',
f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`',
)
self.clickhouse_api.database = self.target_database
logger.info(f'initial replication - done')
Expand Down Expand Up @@ -519,7 +519,7 @@ def handle_drop_table_query(self, query, db_name):

if table_name in self.state.tables_structure:
self.state.tables_structure.pop(table_name)
self.clickhouse_api.execute_command(f'DROP TABLE {"IF EXISTS" if if_exists else ""} {db_name}.{table_name}')
self.clickhouse_api.execute_command(f'DROP TABLE {"IF EXISTS" if if_exists else ""} `{db_name}`.`{table_name}`')

def handle_rename_table_query(self, query, db_name):
tokens = query.split()
Expand All @@ -545,7 +545,7 @@ def handle_rename_table_query(self, query, db_name):
if src_table_name in self.state.tables_structure:
self.state.tables_structure[dest_table_name] = self.state.tables_structure.pop(src_table_name)

ch_clauses.append(f"{src_db_name}.{src_table_name} TO {dest_db_name}.{dest_table_name}")
ch_clauses.append(f"`{src_db_name}`.`{src_table_name}` TO `{dest_db_name}`.`{dest_table_name}`")
self.clickhouse_api.execute_command(f'RENAME TABLE {", ".join(ch_clauses)}')

def log_stats_if_required(self):
Expand Down
2 changes: 1 addition & 1 deletion mysql_ch_replicator/mysql_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def drop_table(self, table_name):
self.cursor.execute(f'DROP TABLE IF EXISTS `{table_name}`')

def create_database(self, db_name):
self.cursor.execute(f'CREATE DATABASE {db_name}')
self.cursor.execute(f'CREATE DATABASE `{db_name}`')

def execute(self, command, commit=False, args=None):
if args:
Expand Down
Loading

0 comments on commit 174222a

Please sign in to comment.