From d8c47d42ab2980be9323e761753d2fc7f4e06f95 Mon Sep 17 00:00:00 2001 From: Chenghao Mou Date: Sat, 25 Nov 2023 17:17:56 +0000 Subject: [PATCH] add v2 scripts --- near_deduplication/README.md | 6 + near_deduplication/bigcode-v2/intra_dedup.py | 552 ++++++++++++++++++ .../bigcode-v2/log4j.properties | 23 + near_deduplication/bigcode-v2/run.sh | 93 +++ near_deduplication/bigcode-v2/run_local.sh | 19 + 5 files changed, 693 insertions(+) create mode 100644 near_deduplication/bigcode-v2/intra_dedup.py create mode 100644 near_deduplication/bigcode-v2/log4j.properties create mode 100755 near_deduplication/bigcode-v2/run.sh create mode 100755 near_deduplication/bigcode-v2/run_local.sh diff --git a/near_deduplication/README.md b/near_deduplication/README.md index f0f7a2b..0d896ef 100644 --- a/near_deduplication/README.md +++ b/near_deduplication/README.md @@ -2,6 +2,12 @@ This is our implementation of near deduplication for BigCode dataset. It is largely evolved from the [original repo](https://github.com/bigcode-project/bigcode-analysis/tree/main/data_analysis/near-deduplication). +## V2 + +We use Google Dataproc and Cloud Storage for the deduplication. The actual script to run is at `bigcode-v2/run.sh`. Feel free to update the parameters in the script to run on your own dataset. + +## V1.* + ### Setup ```` diff --git a/near_deduplication/bigcode-v2/intra_dedup.py b/near_deduplication/bigcode-v2/intra_dedup.py new file mode 100644 index 0000000..3b4566a --- /dev/null +++ b/near_deduplication/bigcode-v2/intra_dedup.py @@ -0,0 +1,552 @@ +#!/usr/bin/env python +# @Date : 2023-08-12 22:18:30 +# @Author : Chenghao Mou (mouchenghao@gmail.com) + +import argparse +import math +import re +import sys +import time +import warnings +from logging import Logger +from typing import List +from typing import Set +from typing import Tuple + +with warnings.catch_warnings(): + warnings.simplefilter("ignore") + import numpy as np + import numpy.typing as npt + import pyspark + import xxhash + from graphframes import GraphFrame # type: ignore + from pyspark import SparkConf + from pyspark.sql import DataFrame + from pyspark.sql import SparkSession + from pyspark.sql import functions as F + from scipy.integrate import quad as integrate + +SEED = 42 +RNG = np.random.RandomState(SEED) +NON_ALPHA = re.compile(r"\W", re.UNICODE) +DTYPE = np.uint32 +MAX_HASH = 4_294_967_295 # maximum 32-bit unsigned integer +MOD_PRIME = 4_294_967_291 # maximum 32-bit prime number + + +def generate_edges(nodes: List[int]) -> List[Tuple[int, int]]: + """ + Generate edges from a cluster. Instead of generating N^2 edges, we only need all nodes align to a single node, since + we will be running connected components on the edges later. + + Parameters + ---------- + nodes : List[int] + The list of nodes in the cluster. + + Returns + ------- + List[Tuple[int, int]] + The list of edges. + + Examples + -------- + >>> generate_edges([1, 2, 3]) + [(2, 1), (3, 1)] + """ + if len(nodes) <= 1: + return [] + + min_node = min(nodes) + return [(n, min_node) for n in nodes if n != min_node] + + +# region: Hashing +def ngrams(content: str, n: int, min_length: int = 5) -> Set[int]: + """ + Return the ngrams in hash values. This function fuses few steps together for performance reasons. + + Parameters + ---------- + content : str + The content of the document. + n : int + The length of each ngram. + min_length : int, optional + The minimum length of each ngram, by default 5 + + Returns + ------- + Set[int] + The set of ngrams in hash values. + + Examples + -------- + >>> sorted(list(ngrams("a b c d", 2, min_length=1))) + [145323813, 433422276, 459146835] + >>> list(ngrams("a b c d", 2, min_length=5)) + [] + >>> list(ngrams("a b", 3, min_length=1)) + [433422276] + """ + tokens: List[str] = NON_ALPHA.split(content.lower()) + if len(tokens) < min_length: + return set() + + ng: Set[str] = {" ".join(tokens[i : i + n]) for i in range(0, max(1, len(tokens) - n + 1))} + return {xxhash.xxh32_intdigest(n) for n in ng} + + +def generate_hash_values( + content: str, + idx: int, + num_perm: int, + ngram_size: int, + min_length: int, + hashranges: List[Tuple[int, int]], + permutations: Tuple[npt.NDArray[DTYPE], npt.NDArray[DTYPE]], +) -> List[Tuple[int, bytes, int]]: + """ + Generate the MinHashLSH values for a given document. + + Parameters + ---------- + content : str + The content of the document. + idx : int + The index of the document. + num_perm : int + The number of permutations. + ngram_size : int + The size of the n-grams. + min_length : int + The minimum number of tokens in a document. + hashranges : list + The ranges of offsets for each hash value. + permutations : Tuple[np.ndarray, np.ndarray] + The permutations for the hash values. + + Returns + ------- + List[Tuple[int, bytes, int]] + The list of (band_idx, hash value, idx) for the document. + + Examples + -------- + >>> content = "hello world" + >>> idx = 0 + >>> num_perm = 250 + >>> ngram_size = 1 + >>> hashranges = [(i, i + 25) for i in range(0, 250, 25)] + >>> PERMUTATIONS = ( + ... RNG.randint(1, MOD_PRIME, size=(num_perm,), dtype=DTYPE), + ... RNG.randint(0, MOD_PRIME, size=(num_perm,), dtype=DTYPE), + ... ) + >>> res = generate_hash_values(content, idx, num_perm, ngram_size, 0, hashranges, PERMUTATIONS) + >>> len(res) + 10 + >>> sum(len(h) for _, h, _ in res) == len(res) * 25 * np.dtype(DTYPE).itemsize + True + """ + a, b = permutations + hashes = np.array(list(ngrams(content, ngram_size, min_length)), dtype=DTYPE) + p_hashes = ((np.outer(hashes, a) + b) % MOD_PRIME) & MAX_HASH + min_hashes = np.vstack([p_hashes, np.full(num_perm, MAX_HASH, dtype=DTYPE)]).min(axis=0) + return [(band_idx, min_hashes[start:end].data.tobytes(), idx) for band_idx, (start, end) in enumerate(hashranges)] + + +# endregion + + +# region: MinHashLSH +def optimal_param( + threshold: float, + num_perm: int, + false_positive_weight: float = 0.5, + false_negative_weight: float = 0.5, +): + """ + Compute the optimal `MinHashLSH` parameter that minimizes the weighted sum + of probabilities of false positive and false negative, taken from datasketch. + + Parameters + ---------- + threshold : float + The threshold for similarity. + num_perm : int + The number of permutations. + false_positive_weight : float + The weight of false positive. + false_negative_weight : float + The weight of false negative. + + Returns + ------- + Tuple[int, int] + The optimal `b` and `r` parameters. + The number of bands, and the number of rows per band respectively. + + Examples + -------- + >>> optimal_param(0.7, 256) + (25, 10) + """ + + def false_positive_area(threshold: float, b: int, r: int): + """Source: `datasketch.lsh`""" + + def area(s): + return 1 - (1 - s ** float(r)) ** float(b) + + a, _ = integrate(area, 0.0, threshold) + return a + + def false_negative_area(threshold: float, b: int, r: int): + """Source: `datasketch.lsh`""" + + def area(s): + return 1 - (1 - (1 - s ** float(r)) ** float(b)) + + a, _ = integrate(area, threshold, 1.0) + return a + + min_error = float("inf") + opt = (0, 0) + for b in range(1, num_perm + 1): + max_r = int(num_perm / b) + for r in range(1, max_r + 1): + fp = false_positive_area(threshold, b, r) + fn = false_negative_area(threshold, b, r) + error = fp * false_positive_weight + fn * false_negative_weight + if error < min_error: + min_error = error + opt = (b, r) + return opt + + +# endregion + + +# region: IO +def partitioned_save(df: DataFrame, chunk_size: int, max_partitions: int, output: str): + """ + Save a Spark DataFrame to a GCS directory in batches of `chunk_size` rows. PySpark natively does not support this + functionality, so this workaround is necessary. + + Parameters + ---------- + df : pyspark.sql.DataFrame + The Spark DataFrame to save. + chunk_size : int + The number of rows per batch. + max_partitions : int + The maximum number of partitions. + output : str + The GCS output directory. + + Raises + ------ + RuntimeError + If the save fails. + """ + + total_rows = df.count() + partitions = max(256, min(math.ceil(total_rows / chunk_size), max_partitions)) + + ( + df.repartition(partitions) + .withColumn("__pid__", F.spark_partition_id()) + .write.partitionBy("__pid__") + .parquet(output, mode="overwrite", compression="snappy") + ) + + +# endregion + + +if __name__ == "__main__": # pragma: no cover + # region: Argument Parsing + parser = argparse.ArgumentParser(description="Intra-dataset near-deduplicating with PySpark") + parser.add_argument("--input", "-i", type=str, required=True, help="GCS input directory of parquet files") + parser.add_argument("--threshold", type=float, default=0.7, help="Similarity threshold") + parser.add_argument("--ngram_size", type=int, default=5, help="N-gram size") + parser.add_argument("--min_length", type=int, default=5, help="Minimum token length of document to be considered") + parser.add_argument("--num_perm", type=int, default=250, help="Number of permutations") + parser.add_argument("--b", type=int, default=None, help="Number of bands") + parser.add_argument("--r", type=int, default=None, help="Number of rows per band") + parser.add_argument("--column", "-c", type=str, default="content", help="Column to deduplicate on") + parser.add_argument("--repo_column", type=str, required=True, help="Code repo column") + parser.add_argument("--output", "-o", type=str, required=True, help="GCS output directory of parquet files") + parser.add_argument("--rank", action="store_true", help="Rank the duplicates by quality indicators") + parser.add_argument("--debug", action="store_true", help="Enable debug mode") + parser.add_argument("--profile", action="store_true", help="Enable profiling") + parser.add_argument("--profile_dir", type=str, default="./profile", help="Checkpoint directory") + parser.add_argument("--checkpoint_dir", type=str, default="./checkpoints", help="Checkpoint directory") + args = parser.parse_args() + # endregion + + # region: Spark Configuration + conf = ( + SparkConf() + .set("spark.app.name", "MinHashLSH") + .set("spark.sql.execution.arrow.pyspark.enabled", "true") + .set("spark.storage.memoryFraction", "1") + .set("spark.default.parallelism", "100") + .set("spark.sql.autoBroadcastJoinThreshold", "20485760") + .set("spark.sql.broadcastTimeout", "3600") + .set("spark.sql.shuffle.partitions", "8192") + .set("spark.python.profile", "true" if args.profile else "false") + ) + spark = SparkSession.Builder().config(conf=conf).getOrCreate() + sc = spark.sparkContext + sc.setCheckpointDir(args.checkpoint_dir) + log: Logger = spark.sparkContext._jvm.org.apache.log4j.LogManager.getLogger(__name__) # type: ignore + # endregion + + # region: Global Variables + FINAL_SIZE: int = 0 + MAX_WRITE_CHUNK_SIZE: int = 200_000 + MAX_WRITE_PARTITIONS: int = 2048 + + B, R = args.b, args.r + if B is None or R is None: + B, R = optimal_param(args.threshold, args.num_perm) + + HASH_RANGES: List[Tuple[int, int]] = [(i * R, (i + 1) * R) for i in range(B)] + PERMUTATIONS: Tuple[npt.NDArray[DTYPE], npt.NDArray[DTYPE]] = ( + RNG.randint(1, MOD_PRIME, size=(args.num_perm,), dtype=DTYPE), + RNG.randint(0, MOD_PRIME, size=(args.num_perm,), dtype=DTYPE), + ) + # endregion + + start_time: float = time.time() + + # region: Data Loading + # persist justification: this data will be needed when removing duplicates + df: DataFrame = ( + spark.read.option("mergeSchema", "true") + .parquet(args.input) + .withColumn("__id__", F.monotonically_increasing_id()) + .persist(pyspark.StorageLevel.DISK_ONLY) + ) + # persist trigger + DATA_SIZE: int = df.count() + log.debug("-" * 120) + log.debug(f"Using {B=}, {R=}") + log.debug(f"Loaded documents: {DATA_SIZE}") + log.debug(f"{args.input=}") + log.debug(f"{args.output=}") + log.debug(f"{args.threshold=}") + log.debug(f"{args.ngram_size=}") + log.debug(f"{args.min_length=}") + log.debug(f"{args.num_perm=}") + log.debug(f"{args.column=}") + log.debug(f"{args.repo_column=}") + for col, dtype in df.dtypes: + log.debug(f"{col:<64}: {dtype}") + log.debug("-" * 120) + + if DATA_SIZE == 0: + log.debug("No data found.") + exit(0) + # endregion + + # region: MinHash + edges: pyspark.RDD = ( + df.select("__id__", args.column) + .rdd.flatMap( + lambda x: generate_hash_values( + content=x[1], # args.column + idx=x[0], # __id__ + num_perm=args.num_perm, + ngram_size=args.ngram_size, + min_length=args.min_length, + hashranges=HASH_RANGES, + permutations=PERMUTATIONS, + ) + ) # (band_idx, band hash value, idx) + .groupBy(lambda x: (x[0], x[1])) # group by (band_idx, band hash value), potential bottleneck + .flatMap(lambda x: generate_edges([ele[2] for ele in x[1]])) + .distinct() + ).persist(pyspark.StorageLevel.DISK_ONLY) + log.debug(f"Initial edges: {edges.count()}") + + # endregion + + # region: Connected Components + + if edges.isEmpty(): + partitioned_save(df, MAX_WRITE_CHUNK_SIZE, MAX_WRITE_PARTITIONS, args.output) + df.unpersist() + edges.unpersist() + + log.debug("-" * 120) + log.debug("No duplicates found.") + log.debug(f"Data Output: {args.output}") + log.debug(f"Time: {time.time() - start_time:.2f}s") + log.debug("-" * 120) + + sys.exit(0) + + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + edges_df: DataFrame = ( + spark.createDataFrame(edges, schema=["src", "dst"]) + .repartition(4096) + .persist(pyspark.StorageLevel.DISK_ONLY) + ) + log.debug(f"Edges DataFrame: {edges_df.count()}") + vertices_df: DataFrame = ( + edges_df.select(F.col("src").alias("id")) + .union(edges_df.select(F.col("dst").alias("id"))) + .distinct() + .repartition(4096) + .persist(pyspark.StorageLevel.DISK_ONLY) + ) + log.debug(f"Vertices DataFrame: {vertices_df.count()}") + assignment: DataFrame = ( + GraphFrame(vertices_df, edges_df).connectedComponents().persist(pyspark.StorageLevel.DISK_ONLY) + ) + log.debug(f"Assignment DataFrame: {assignment.count()}") + edges_df.unpersist() + vertices_df.unpersist() + # endregion + + # region: Merge Results + # justification: this is needed for final output + df = df.join( + assignment.select(F.col("id").alias("__id__"), F.col("component").alias("__component__")), + on="__id__", + how="left", + ).persist(pyspark.StorageLevel.DISK_ONLY) + assignment.unpersist() + log.debug(f"Merging records: {df.count()}") + # endregion + + # region: Quality Control and Data Removal + # + # This section is hard-coded for The Stack + # + # A repo's quality is measured by, in order of importance: + # 1. The number of stars (higher is better) + # 2. The number of forks (higher is better) + # + # A file's quality is therefore measured by the quality of its repo to prioritize + # the integrity of the repo so training context can be maximized at the repo level. + # directory_id object + # blob_id object + # content_id object + # path object + # length int64 + # content object + # src_encoding object + # language object + # is_vendor bool + # is_generated bool + # blob_prefix object + # repo_name object + # repo_url object + # snapshot_id object + # revision_id object + # branch_name object + # visit_date datetime64[ns] + # revision_date datetime64[ns] + # committer_date datetime64[ns] + # github_id float64 + # star_events_count int64 + # fork_events_count int64 + # gha_license_id object + # gha_fork object + # gha_event_created_at datetime64[ns] + # gha_created_at datetime64[ns] + # gha_updated_at datetime64[ns] + # gha_pushed_at datetime64[ns] + # gha_size float64 + # gha_stargazers_count float64 + # gha_forks_count float64 + # gha_open_issues_count float64 + # gha_language object + # gha_archived object + # gha_disabled object + # detected_licenses object + # license_type object + + if args.rank: + rank_columns = [ + "__component__", + "__id__", + args.repo_column, + "revision_date", + "visit_date", + "fork_events_count", + "star_events_count", + "license_type", + ] + # justification: this is needed for the ranking + duplicates: pyspark.RDD = ( + df.filter(F.col("__component__").isNotNull()) + .select(*rank_columns) + .rdd.map(lambda x: (x[0], x)) + .persist(pyspark.StorageLevel.DISK_ONLY) + ) + + def compare_records(a, b): + return sorted( + [a, b], + key=lambda x: ( + # license_type, the more permissive the better + ["permissive", "no_license", "non_permissive"].index(x[-1]) if x[-1] is not None else float("inf"), + # star_events_count, the more the better + -x[-2] if x[-2] is not None else 0.0, + # fork_events_count, the more the better + -x[-3] if x[-3] is not None else 0.0, + # revision_date, the latest the better + -np.datetime64(x[-5]).astype(np.uint64).item() if x[-5] is not None else float("inf"), + # visit_date, the latest the better + -np.datetime64(x[-4]).astype(np.uint64).item() if x[-4] is not None else float("inf"), + ), + )[0] + + log.debug(f"Ranking duplicates: {duplicates.count()}") + flags: pyspark.RDD = ( + duplicates.reduceByKey(lambda x, y: compare_records(x, y)) + .map(lambda x: (x[1][1], True)) + .persist(pyspark.StorageLevel.DISK_ONLY) + ) + log.debug(f"Keeping duplicates: {flags.count()}") + duplicates.unpersist() + + df = ( + df.join(spark.createDataFrame(flags, schema=["__id__", "__keep__"]), on="__id__", how="left") + .filter(F.col("__component__").isNull() | F.col("__keep__")) + .drop("__keep__", "__component__") + .persist(pyspark.StorageLevel.DISK_ONLY) + ) + FINAL_SIZE = df.count() + flags.unpersist() + else: + df = ( + df.filter(F.col("__component__").isNull() | (F.col("__component__") == F.col("__id__"))) + .drop("__component__") + .persist(pyspark.StorageLevel.DISK_ONLY) + ) + FINAL_SIZE = df.count() + + # endregion + + # region: Output + partitioned_save(df, MAX_WRITE_CHUNK_SIZE, MAX_WRITE_PARTITIONS, args.output) + df.unpersist() + + # endregion + + log.debug("-" * 120) + log.debug(f"Number of rows before: {DATA_SIZE}") + log.debug(f"Number of rows after: {FINAL_SIZE}") + log.debug(f"Percentage of rows kept: {FINAL_SIZE / max(0, DATA_SIZE) * 100:.2f}%") + log.debug(f"Output: {args.output}") + log.debug(f"Time: {time.time() - start_time:.2f}s") + log.debug("-" * 120) + + if args.profile: + sc.dump_profiles(args.profile_dir) diff --git a/near_deduplication/bigcode-v2/log4j.properties b/near_deduplication/bigcode-v2/log4j.properties new file mode 100644 index 0000000..b48d917 --- /dev/null +++ b/near_deduplication/bigcode-v2/log4j.properties @@ -0,0 +1,23 @@ +#Define root logger options +log4j.rootLogger=ERROR, console +log4j.logger.__main__=DEBUG + +#Define console appender +log4j.appender.console=org.apache.log4j.ConsoleAppender +logrj.appender.console.Target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%-5p %c{1} - %m%n + +#Define rolling file appender +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.File=logs/main.log +log4j.appender.file.Append=true +log4j.appender.file.ImmediateFlush=true +log4j.appender.file.MaxFileSize=10MB +log4j.appender.file.MaxBackupIndex=5 +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d %d{Z} [%t] %-5p (%F:%L) - %m%n + +#setting additivity +log4j.additivity.com.journaldev.log4j=false +log4j.additivity.com.journaldev.log4j.logic=false diff --git a/near_deduplication/bigcode-v2/run.sh b/near_deduplication/bigcode-v2/run.sh new file mode 100755 index 0000000..3373ccc --- /dev/null +++ b/near_deduplication/bigcode-v2/run.sh @@ -0,0 +1,93 @@ +#!/bin/bash +# -*- coding: utf-8 -*- +# @Date : 2023-09-02 10:30:06 +# @Author : Chenghao Mou (mouchenghao@gmail.com) + +CLUSTER_NAME="" +PROJECT_ID="" +REGION="" +CONTAINER="" +DIRECTORY="" +CHECKPOINT_DIR="hdfs:///tmp/checkpoints" +NUM_WORKERS=24 +MASTER_MACHINE_TYPE="" +MASTER_BOOT_DISK_SIZE=1024 +WORKER_MACHINE_TYPE="" +WORKER_BOOT_DISK_SIZE=2048 +IMAGE_VERSION="2.0-debian10" +SPARK_JARS="gs://spark-lib/bigquery/spark-3.2-bigquery-0.32.2.jar" +THRESHOLD=0.7 +REPO_COLUMN="repo_url" + +DEDUPED_DIRECTORY="${DIRECTORY}_deduped" +# DEDUPED_INDEX_DIRECTORY="${DEDUPED_DIRECTORY}_index" +DIRS=("f_star") +# DIRS=$(cat dirs.list) + +# Create cluster if it doesn't exist +if ! gcloud dataproc clusters list --region $REGION | grep -q $CLUSTER_NAME; then + gcloud dataproc clusters create $CLUSTER_NAME \ + --enable-component-gateway \ + --region $REGION \ + --zone "" \ + --master-machine-type $MASTER_MACHINE_TYPE \ + --master-boot-disk-size $MASTER_BOOT_DISK_SIZE \ + --num-workers $NUM_WORKERS \ + --worker-machine-type $WORKER_MACHINE_TYPE \ + --worker-boot-disk-size $WORKER_BOOT_DISK_SIZE \ + --image-version $IMAGE_VERSION \ + --project $PROJECT_ID \ + --properties=^#^dataproc:conda.packages='scipy==1.10.1'#dataproc:pip.packages='xxhash==3.3.0' +fi + +# Start cluster if it's not running +if ! gcloud dataproc clusters list --region $REGION | grep -q RUNNING | grep -q $CLUSTER_NAME; then + gcloud dataproc clusters start $CLUSTER_NAME --region $REGION +fi + +# Progress bar +TOTAL=$(echo "${DIRS}" | wc -w) +LENGTH=20 +i=0 + +echo "Total number of directories: $TOTAL" +for DIR in $DIRS; do + # Progress bar + echo -n "[ " + curr_pos=$((i * LENGTH / TOTAL)) + for ((k = 0; k <= curr_pos; k++)); do echo -n "==="; done + for ((j = k + 1; j <= LENGTH; j++)); do echo -n " "; done + v=$(((i + 1) * 100 / TOTAL)) + echo -n " ] " + echo "$v %" $'\r' + ((i++)) + + DIR=${DIR%/} + INPUT_GCS_PATH="${CONTAINER}/${DIRECTORY}/${DIR}" + LAN=$(echo "$DIR" | rev | cut -d'/' -f1 | rev) + OUTPUT_GCS_PATH="${CONTAINER}/${DEDUPED_DIRECTORY}/${LAN}" + # OUTPUT_INDEX_GCS_PATH="${CONTAINER}/${DEDUPED_INDEX_DIRECTORY}/${LAN}" + # OUTPUT_STATUS_GCS_PATH="${OUTPUT_GCS_PATH}/_SUCCESS" + # result=$(gsutil stat "${OUTPUT_STATUS_GCS_PATH}" 2>&1 | grep -c "No URLs matched") + # if [[ $result != 1 ]]; then + # echo "Skipping ${LAN}" + # continue + # fi + echo "Processing ${LAN}" + + gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \ + --region $REGION \ + --jars $SPARK_JARS \ + --driver-log-levels root=FATAL,__main__=DEBUG \ + --properties="spark.executor.memory=210g,spark.driver.memory=16g,spark.executor.cores=59,spark.jars.packages=graphframes:graphframes:0.8.2-spark3.2-s_2.12" \ + intra_dedup.py -- \ + --input "$INPUT_GCS_PATH" \ + --output "$OUTPUT_GCS_PATH" \ + --checkpoint_dir "$CHECKPOINT_DIR" \ + --threshold $THRESHOLD \ + --repo_column $REPO_COLUMN \ + --rank + +done + +gcloud dataproc clusters stop $CLUSTER_NAME --region $REGION diff --git a/near_deduplication/bigcode-v2/run_local.sh b/near_deduplication/bigcode-v2/run_local.sh new file mode 100755 index 0000000..ea7d271 --- /dev/null +++ b/near_deduplication/bigcode-v2/run_local.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# -*- coding: utf-8 -*- +# @Date : 2023-09-02 10:30:06 +# @Author : Chenghao Mou (mouchenghao@gmail.com) + +/Users/chenghao/Downloads/spark-3.5.0-bin-hadoop3/bin/spark-submit \ + --executor-memory 16g \ + --driver-memory 20g \ + --executor-cores 3 \ + --num-executors 2 \ + --packages graphframes:graphframes:0.8.2-spark3.2-s_2.12 \ + --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=./log4j.properties" \ + --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=./log4j.properties" \ + --conf "spark.python.profile=true" \ + intra_dedup.py\ + --input "" \ + --output "" \ + --threshold 0.7 \ + --debug