Skip to content

Commit

Permalink
Use DateTime64 for timestamp mysql type (instead of string) (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
bakwc authored Jan 18, 2025
1 parent 507f2e8 commit 97eaf8c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
21 changes: 21 additions & 0 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,25 @@ def strip_sql_comments(sql_statement):
return sqlparse.format(sql_statement, strip_comments=True).strip()


def convert_timestamp_to_datetime64(input_str):

# Define the regex pattern
pattern = r'^timestamp(?:\((\d+)\))?$'

# Attempt to match the pattern
match = re.match(pattern, input_str.strip(), re.IGNORECASE)

if match:
# If a precision is provided, include it in the replacement
precision = match.group(1)
if precision is not None:
return f'DateTime64({precision})'
else:
return 'DateTime64'
else:
raise ValueError(f"Invalid input string format: '{input_str}'")


class MysqlToClickhouseConverter:
def __init__(self, db_replicator: 'DbReplicator' = None):
self.db_replicator = db_replicator
Expand Down Expand Up @@ -238,6 +257,8 @@ def convert_type(self, mysql_type, parameters):
return 'Int32'
if 'real' in mysql_type:
return 'Float64'
if mysql_type.startswith('timestamp'):
return convert_timestamp_to_datetime64(mysql_type)
if mysql_type.startswith('time'):
return 'String'
if 'varbinary' in mysql_type:
Expand Down
14 changes: 10 additions & 4 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import os
import shutil
import time
Expand Down Expand Up @@ -887,13 +888,14 @@ def test_different_types_2():
test2 point,
test3 binary(16),
test4 set('1','2','3','4','5','6','7'),
test5 timestamp(0),
PRIMARY KEY (id)
);
''')

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test3, test4) VALUES "
f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5');",
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test3, test4, test5) VALUES "
f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5', '2023-08-15 14:30:00');",
commit=True,
)

Expand All @@ -910,8 +912,8 @@ def test_different_types_2():
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test4) VALUES "
f"(1, POINT(15.0, 14.0), '2,4,5');",
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test4, test5) VALUES "
f"(1, POINT(15.0, 14.0), '2,4,5', '2023-08-15 14:40:00');",
commit=True,
)

Expand All @@ -925,6 +927,10 @@ def test_different_types_2():
assert ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test4'] == '2,4,5'
assert ch.select(TEST_TABLE_NAME, 'test1=False')[0]['test4'] == '1,3,5'

value = ch.select(TEST_TABLE_NAME, 'test1=True')[0]['test5']
assert isinstance(value, datetime.datetime)
assert str(value) == '2023-08-15 14:40:00+00:00'

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES "
f"(0, NULL);",
Expand Down

0 comments on commit 97eaf8c

Please sign in to comment.