Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of 1664: add ability to do efficient blocking based on list/array intersections #1692

Merged
merged 43 commits into from
Jan 17, 2024
Merged
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
ed6e47b
add tests for array-based blocking
nerskin Oct 23, 2023
f3024ef
Add logic for blocking on array intersections by unnesting tables
nerskin Oct 23, 2023
bfea53c
update hardcoded hash in test_correctness_of_convergence.py
nerskin Oct 23, 2023
4399142
Merge branch 'moj-analytical-services:master' into master
nerskin Oct 23, 2023
d3fe4dd
linting/formatting
nerskin Oct 24, 2023
a48b0ec
update table names for consistency with splink conventions
nerskin Oct 24, 2023
3b8222a
Update tests
nerskin Oct 24, 2023
1f1ea76
ensure that tables names are unique
nerskin Oct 24, 2023
eacdc04
lint
nerskin Oct 24, 2023
a9076ea
wip
RobinL Nov 1, 2023
8ed833f
wip
RobinL Nov 1, 2023
217f633
move materialisation logic to separate function
RobinL Nov 2, 2023
43298b1
rename for clarity
RobinL Nov 3, 2023
90fadb5
improve clairty of names
RobinL Nov 3, 2023
efb72c3
pushing logic into blockingrule class
RobinL Nov 3, 2023
4caa847
better names
RobinL Nov 6, 2023
4c40ffd
remove materialised tables after use
RobinL Nov 6, 2023
836ea36
is salted
RobinL Nov 6, 2023
46d834b
merge in master
RobinL Nov 6, 2023
fa096cd
fix merge
RobinL Nov 6, 2023
8621d2d
exploding blocking rule class
RobinL Nov 6, 2023
4d40bd9
all logic now pushed into blocking rules classes
RobinL Nov 6, 2023
1117737
better names
RobinL Nov 6, 2023
7c71849
change all files to current master
RobinL Nov 30, 2023
d78bec1
Merge branch 'master' into refactor_ids_to_compare_creation
RobinL Nov 30, 2023
de972f7
Merge branch 'master' into refactor_ids_to_compare_creation
RobinL Nov 30, 2023
da5a499
initial working implementation
RobinL Nov 30, 2023
bba25dc
update
RobinL Nov 30, 2023
24cd2e7
fix spark
RobinL Dec 11, 2023
0b2f338
Merge branch 'master' into refactor_ids_to_compare_creation
RobinL Dec 11, 2023
1d63218
format better
RobinL Dec 11, 2023
12e0c90
fix tests
RobinL Dec 11, 2023
c2aea3c
make work with deterministic link
RobinL Dec 11, 2023
a58deca
put check supported logic in place most likely to be caught
RobinL Dec 11, 2023
4021d3f
gives correct error messages
RobinL Dec 11, 2023
0cede3d
fix tests
RobinL Dec 11, 2023
26bf082
add tests back in
RobinL Dec 11, 2023
c7307eb
fix link type issue and unique ids
RobinL Dec 12, 2023
8aecb27
lint
RobinL Dec 12, 2023
6798cb1
fix line length
RobinL Dec 12, 2023
9ef61f1
rename method for clarity
RobinL Dec 12, 2023
1ece5d7
Merge branch 'master' into refactor_ids_to_compare_creation
RobinL Jan 17, 2024
ca0e202
Move out of loop
RobinL Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip
RobinL committed Nov 1, 2023
commit a9076ea5df5051c59e5d9b7d00fbae226737105d
157 changes: 87 additions & 70 deletions splink/blocking.py
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ def __init__(
self.sqlglot_dialect = sqlglot_dialect
self.salting_partitions = salting_partitions
self.arrays_to_explode = arrays_to_explode
self.ids_to_compare = []
self.ids_to_compare = [] # list of SplinkDataFrames representing ids to compare

@property
def sql_dialect(self):
@@ -78,17 +78,15 @@ def exclude_from_following_rules_sql(self, linker: Linker):
unique_id_column = linker._settings_obj._unique_id_column_name

if self.ids_to_compare:

ids_to_compare_sql = " union all ".join(
[f"select * from {ids.physical_name}" for ids in self.ids_to_compare]
)
# self.ids_to_compare[0].physical_name

return f"""EXISTS (
select 1 from ({ids_to_compare_sql}) as ids_to_compare
where (
l.{unique_id_column} = ids_to_compare.{unique_id_column}_l and
r.{unique_id_column} = ids_to_compare.{unique_id_column}_r
l.{unique_id_column} = ids_to_compare.{unique_id_column}_l and
r.{unique_id_column} = ids_to_compare.{unique_id_column}_r
)
)
"""
@@ -223,8 +221,59 @@ def _sql_gen_where_condition(link_type, unique_id_cols):
return where_condition


def materialise_array_exploded_id_lookup(linker: Linker, br, link_type, apply_salt):
try:
input_dataframe = linker._intermediate_table_cache[
"__splink__df_concat_with_tf"
]
except KeyError:
input_dataframe = linker._initialise_df_concat_with_tf()

input_colnames = {col.name() for col in input_dataframe.columns}
arrays_to_explode_quoted = [
InputColumn(colname, sql_dialect=linker._sql_dialect).quote().name()
for colname in br.arrays_to_explode
]

explode_sql = linker._gen_explode_sql(
"__splink__df_concat_with_tf",
br.arrays_to_explode,
list(input_colnames.difference(arrays_to_explode_quoted)),
)

linker._enqueue_sql(
f"{explode_sql}",
"__splink__df_concat_with_tf_unnested",
)
unique_id_col = linker.settings_obj._unique_id_column_name

if link_type == "two_dataset_link_only":
where_condition = where_condition + " and l.source_dataset < r.source_dataset"

# ensure that table names are unique
if apply_salt:
to_hash = (br + linker._cache_uid).encode("utf-8")
salt_id = "salt_id_" + hashlib.sha256(to_hash).hexdigest()[:9]
else:
salt_id = ""

linker._enqueue_sql(
f"""
select distinct
l.{unique_id_col} as {unique_id_col}_l,
r.{unique_id_col} as {unique_id_col}_r
from __splink__df_concat_with_tf_unnested as l
inner join __splink__df_concat_with_tf_unnested as r
on ({br})
{where_condition} {br.and_not_preceding_rules_sql(linker)}""",
f"ids_to_compare_blocking_rule_{br.match_key}{salt_id}",
)
ids_to_compare = linker._execute_sql_pipeline([input_dataframe])
br.ids_to_compare.append(ids_to_compare)


# flake8: noqa: C901
def block_using_rules_sql(linker: Linker):
def block_using_rules(linker: Linker):
"""Use the blocking rules specified in the linker's settings object to
generate a SQL statement that will create pairwise record comparions
according to the blocking rule(s).
@@ -285,76 +334,44 @@ def block_using_rules_sql(linker: Linker):
probability = ""

sqls = []
for br in blocking_rules:

all_blocking_rules = []
for br in blocking_rules:
# Apply our salted rules to resolve skew issues. If no salt was
# selected to be added, then apply the initial blocking rule.
if apply_salt:
salted_blocking_rules = br.salted_blocking_rules
all_blocking_rules.extend(br.salted_blocking_rules)
else:
salted_blocking_rules = [br.blocking_rule]

for salted_br in salted_blocking_rules:
if not br.arrays_to_explode:
sql = f"""
select
{sql_select_expr}
, '{br.match_key}' as match_key
all_blocking_rules.append(br.blocking_rule)

for br in all_blocking_rules:
materialise_array_exploded_id_lookup(linker, br)

for br in all_blocking_rules:
if not br.arrays_to_explode:
sql = f"""
select
{sql_select_expr}
, '{br.match_key}' as match_key
{probability}
from {linker._input_tablename_l} as l
inner join {linker._input_tablename_r} as r
on
({br})
{where_condition}
{br.and_not_preceding_rules_sql(linker)}
"""
else:
sql = f"""
select {sql_select_expr}, '{br.match_key}' as match_key
{probability}
from {linker._input_tablename_l} as l
inner join {linker._input_tablename_r} as r
on
({salted_br})
{where_condition}
{br.and_not_preceding_rules_sql(linker)}
"""
else:
try:
input_dataframe = linker._intermediate_table_cache[
"__splink__df_concat_with_tf"
]
except KeyError:
input_dataframe = linker._initialise_df_concat_with_tf()
input_colnames = {col.name() for col in input_dataframe.columns}
arrays_to_explode_quoted = [
InputColumn(colname, sql_dialect=linker._sql_dialect).quote().name()
for colname in br.arrays_to_explode
]
linker._enqueue_sql(
f"{linker._gen_explode_sql('__splink__df_concat_with_tf',br.arrays_to_explode,list(input_colnames.difference(arrays_to_explode_quoted)))}",
"__splink__df_concat_with_tf_unnested",
)
unique_id_col = settings_obj._unique_id_column_name

if link_type == "two_dataset_link_only":
where_condition = (
where_condition + " and l.source_dataset < r.source_dataset"
)

# ensure that table names are unique
if apply_salt:
to_hash = (salted_br + linker._cache_uid).encode("utf-8")
salt_id = "salt_id_" + hashlib.sha256(to_hash).hexdigest()[:9]
else:
salt_id = ""

linker._enqueue_sql(
f"""
select distinct l.{unique_id_col} as {unique_id_col}_l,r.{unique_id_col} as {unique_id_col}_r
from __splink__df_concat_with_tf_unnested as l inner join __splink__df_concat_with_tf_unnested as r on ({salted_br})
{where_condition} {br.and_not_preceding_rules_sql(linker)}""",
f"ids_to_compare_blocking_rule_{br.match_key}{salt_id}",
)
ids_to_compare = linker._execute_sql_pipeline([input_dataframe])
br.ids_to_compare.append(ids_to_compare)
sql = f"""
select {sql_select_expr}, '{br.match_key}' as match_key
{probability}
from {ids_to_compare.physical_name} as pairs
left join {linker._input_tablename_l} as l on pairs.{unique_id_col}_l=l.{unique_id_col}
left join {linker._input_tablename_r} as r on pairs.{unique_id_col}_r=r.{unique_id_col}
"""
sqls.append(sql)
from {ids_to_compare.physical_name} as pairs
left join {linker._input_tablename_l} as l
on pairs.{unique_id_col}_l=l.{unique_id_col}
left join {linker._input_tablename_r} as r
on pairs.{unique_id_col}_r=r.{unique_id_col}
"""
sqls.append(sql)

if (
linker._two_dataset_link_only