From 77c0d5f44d6233158d97bf15956b7294fb3ece8f Mon Sep 17 00:00:00 2001 From: ADBond <48208438+ADBond@users.noreply.github.com> Date: Tue, 23 Jul 2024 17:49:47 +0100 Subject: [PATCH] remove reference to SparkLinker and update code snippet --- .../performance/optimising_spark.md | 8 +++-- docs/topic_guides/performance/salting.md | 34 +++++++++---------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/docs/topic_guides/performance/optimising_spark.md b/docs/topic_guides/performance/optimising_spark.md index 97b336eadf..6389653c9f 100644 --- a/docs/topic_guides/performance/optimising_spark.md +++ b/docs/topic_guides/performance/optimising_spark.md @@ -25,9 +25,10 @@ For a cluster with 10 CPUs, that outputs about 8GB of data in parquet format, th spark.conf.set("spark.default.parallelism", "50") spark.conf.set("spark.sql.shuffle.partitions", "50") -linker = SparkLinker( +linker = Linker( person_standardised_nodes, settings, + db_api=spark_api, break_lineage_method="parquet", num_partitions_on_repartition=80, ) @@ -45,9 +46,10 @@ Splink will automatically break lineage in sensible places. We have found in pra You can do this using the `break_lineage_method` parameter as follows: ``` -linker = SparkLinker( +linker = Linker( person_standardised_nodes, settings, + db_api=db_api, break_lineage_method="parquet" ) @@ -78,7 +80,7 @@ In general, increasing parallelism will make Spark 'chunk' your job into a large ## Repartition after blocking -For some jobs, setting `repartition_after_blocking=True` when you initialise the `SparkLinker` may improve performance. +For some jobs, setting `repartition_after_blocking=True` when you initialise the `SparkAPI` may improve performance. ## Salting diff --git a/docs/topic_guides/performance/salting.md b/docs/topic_guides/performance/salting.md index 106d37342e..80080ff93b 100644 --- a/docs/topic_guides/performance/salting.md +++ b/docs/topic_guides/performance/salting.md @@ -17,7 +17,7 @@ Further information about the motivation for salting can be found [here](https:/ ## How to use salting -To enable salting using the `SparkLinker`, you provide some of your blocking rules as a dictionary rather than a string. +To enable salting using the `Linker` with Spark, you provide some of your blocking rules as a dictionary rather than a string. This enables you to choose the number of salts for each blocking rule. @@ -25,14 +25,14 @@ Blocking rules provided as plain strings default to no salting (`salting_partiti The following code snippet illustrates: -``` +```py import logging -from pyspark.context import SparkContext, SparkConf +from pyspark.context import SparkConf, SparkContext from pyspark.sql import SparkSession -from splink.spark.linker import SparkLinker -from splink.spark.comparison_library import levenshtein_at_thresholds, exact_match +import splink.comparison_library as cl +from splink import Linker, SparkAPI, splink_datasets conf = SparkConf() conf.set("spark.driver.memory", "12g") @@ -41,7 +41,7 @@ conf.set("spark.default.parallelism", "8") sc = SparkContext.getOrCreate(conf=conf) spark = SparkSession(sc) - +spark.sparkContext.setCheckpointDir("./tmp_checkpoints") settings = { "probability_two_random_records_match": 0.01, @@ -51,29 +51,27 @@ settings = { {"blocking_rule": "l.first_name = r.first_name", "salting_partitions": 4}, ], "comparisons": [ - levenshtein_at_thresholds("first_name", 2), - exact_match("surname"), - exact_match("dob"), - exact_match("city", term_frequency_adjustments=True), - exact_match("email"), + cl.LevenshteinAtThresholds("first_name", 2), + cl.ExactMatch("surname"), + cl.ExactMatch("dob"), + cl.ExactMatch("city").configure(term_frequency_adjustments=True), + cl.ExactMatch("email"), ], "retain_matching_columns": True, "retain_intermediate_calculation_columns": True, - "additional_columns_to_retain": ["group"], + "additional_columns_to_retain": ["cluster"], "max_iterations": 1, "em_convergence": 0.01, } -df = spark.read.csv("./tests/datasets/fake_1000_from_splink_demos.csv", header=True) +df = splink_datasets.fake_1000 - -linker = SparkLinker(df, settings) +spark_api = SparkAPI(spark_session=spark) +linker = Linker(df, settings, db_api=spark_api) logging.getLogger("splink").setLevel(5) -linker.load_settings(settings) -linker.deterministic_link() - +linker.inference.deterministic_link() ``` And we can see that salting has been applied by looking at the SQL generated in the log: