From d9612d99541789cbe3651007e7c18169f6ab9034 Mon Sep 17 00:00:00 2001 From: Shalisha Witherspoon Date: Tue, 4 Feb 2025 14:04:50 -0500 Subject: [PATCH 1/6] added logging/timestamp to rep_removal phases --- .../rep_removal/dpk_rep_removal/transform.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/transforms/universal/rep_removal/dpk_rep_removal/transform.py b/transforms/universal/rep_removal/dpk_rep_removal/transform.py index 8b505a5c5..5d6d7f95e 100644 --- a/transforms/universal/rep_removal/dpk_rep_removal/transform.py +++ b/transforms/universal/rep_removal/dpk_rep_removal/transform.py @@ -12,6 +12,7 @@ import logging import os import tempfile +import datetime import pyarrow as pa import pandas as pd from dpk_rep_removal.dedup_pq_level import load_pq_docs_once_avoidIO, extract_dup_per_doc_avoidIO_further, save_deduped_pq_once @@ -21,7 +22,7 @@ from dpk_rep_removal.make_suffix_array import make_suffix_array from data_processing.transform import AbstractTableTransform -logging.basicConfig(level=logging.DEBUG) +logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO) class RepRemovalTransform(AbstractTableTransform): @@ -48,8 +49,10 @@ def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Tab try: with tempfile.TemporaryDirectory() as td: save_dir = os.path.join(td, 'save_dir') + logging.info(f"{datetime.datetime.now()}: encoding parquet") encoded_pq = os.path.join(save_dir, self.dedup_level) + load_pq_docs_once_avoidIO(pq_df, self.contents_column_name, save_dir, self.dedup_level, self.tokenize, int(self.num_threads)) @@ -58,10 +61,12 @@ def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Tab os.makedirs(cache_dir) os.makedirs(temp_dir) + logging.info(f"{datetime.datetime.now()}: making suffix array") make_suffix_array(encoded_pq, temp_dir, self.dedup_level, int(self.num_threads), int(self.num_cpus)) + logging.info(f"{datetime.datetime.now()}: finding repeated substrings") find_repeated_substrings(encoded_pq, self.length_thresh, cache_dir, self.num_threads, self.frequency_threshold, self.retain_first_copy) - + logging.info(f"{datetime.datetime.now()}: collecting duplicates") repeated_pairs = collect_duplicates_avoidIO(encoded_pq, self.length_thresh, cache_dir) # no duplicates found @@ -74,7 +79,7 @@ def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Tab self.contents_column_name, self.num_threads, self.tokenize) - + logging.info(f"Num Duplicate Rows: {len(repeated_pairs) - 4}") metadata = { "pre_content col size": pre_content_col_size, "rep_removed_content col size": deduped_content_col_size, From eabe17f0825034dbc2a6783824d88d005c9a9f8f Mon Sep 17 00:00:00 2001 From: Shalisha Witherspoon Date: Tue, 4 Feb 2025 16:17:57 -0500 Subject: [PATCH 2/6] updated transforms for logging --- transforms/universal/rep_removal/dpk_rep_removal/transform.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/transforms/universal/rep_removal/dpk_rep_removal/transform.py b/transforms/universal/rep_removal/dpk_rep_removal/transform.py index 5d6d7f95e..eb51b3f06 100644 --- a/transforms/universal/rep_removal/dpk_rep_removal/transform.py +++ b/transforms/universal/rep_removal/dpk_rep_removal/transform.py @@ -43,6 +43,7 @@ def __init__(self, config: dict[str, Any]): else: self.retain_first_copy = True + def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]: """ """ pq_df = table.to_pandas() @@ -51,8 +52,6 @@ def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Tab save_dir = os.path.join(td, 'save_dir') logging.info(f"{datetime.datetime.now()}: encoding parquet") encoded_pq = os.path.join(save_dir, self.dedup_level) - - load_pq_docs_once_avoidIO(pq_df, self.contents_column_name, save_dir, self.dedup_level, self.tokenize, int(self.num_threads)) From 3884776b032125dfe7847e9a6c0a73716461abe5 Mon Sep 17 00:00:00 2001 From: Shalisha Witherspoon Date: Wed, 5 Feb 2025 15:44:12 -0500 Subject: [PATCH 3/6] updated default threads, fixed bug with counting duplicate rows, and removed unused dedup functions --- transforms/universal/rep_removal/README.md | 29 ++- .../dpk_rep_removal/dedup_pq_level.py | 188 ------------------ .../dpk_rep_removal/make_suffix_array.py | 5 +- .../rep_removal/dpk_rep_removal/runtime.py | 2 +- .../rep_removal/dpk_rep_removal/transform.py | 30 ++- 5 files changed, 41 insertions(+), 213 deletions(-) diff --git a/transforms/universal/rep_removal/README.md b/transforms/universal/rep_removal/README.md index d810479f9..3b2384cae 100644 --- a/transforms/universal/rep_removal/README.md +++ b/transforms/universal/rep_removal/README.md @@ -52,16 +52,25 @@ pip install --no-binary :all: psutil ``` -B) Compile the dedup_dataset binary from the **dpk_rep_removal** package dir: -- Install from git clone repo: -```shell -cargo install --path dpk_rep_removal/rust -``` -- Install from pip install (Note: Activate venv before running next commands): -```shell -PACKAGE_LOCATION=$(pip show data_prep_toolkit_transforms | grep Location | awk '{print $2}') -cargo install --path $PACKAGE_LOCATION/dpk_rep_removal/rust -``` +[//]: # (B) Compile the dedup_dataset binary from the **dpk_rep_removal** package dir:) + +[//]: # (- Install from git clone repo:) + +[//]: # (```shell) + +[//]: # (cargo install --path dpk_rep_removal/rust) + +[//]: # (```) + +[//]: # (- Install from pip install (Note: Activate venv before running next commands):) + +[//]: # (```shell) + +[//]: # (PACKAGE_LOCATION=$(pip show data_prep_toolkit_transforms | grep Location | awk '{print $2}')) + +[//]: # (cargo install --path $PACKAGE_LOCATION/dpk_rep_removal/rust) + +[//]: # (```) ## Input Parameters The transform can be initialized with the following parameters: diff --git a/transforms/universal/rep_removal/dpk_rep_removal/dedup_pq_level.py b/transforms/universal/rep_removal/dpk_rep_removal/dedup_pq_level.py index ab78338f9..35dca2b7e 100644 --- a/transforms/universal/rep_removal/dpk_rep_removal/dedup_pq_level.py +++ b/transforms/universal/rep_removal/dpk_rep_removal/dedup_pq_level.py @@ -77,58 +77,6 @@ def decode(x): return out -def load_pq_docs(pq_df, content_col, save_dir, dataset_name, tokenize, num_threads): - global args_tokenize - args_tokenize = tokenize - - pre_sep = b"\xff\xff" - post_sep = b"" - - if not os.path.exists(save_dir): - os.mkdir(save_dir) - - fout = open(os.path.join(save_dir, dataset_name), "wb") - - with mp.get_context("fork").Pool(num_threads) as p: - sizes = [0] - docs_content_text = pq_df[content_col].tolist() - encoded_docs = p.map(encode, docs_content_text) - - for doc in encoded_docs: - next_line = sep() + doc - fout.write(next_line) - sizes.append(sizes[-1] + len(next_line)) - fout.close() - open(os.path.join(save_dir, dataset_name + ".size"), "wb").write(np.array(sizes, dtype=np.uint64).tobytes()) - - -def load_pq_docs_once(pq_df, content_col, save_dir, dataset_name, tokenize, num_threads): - global encoded_docs, loaded_size, args_tokenize - args_tokenize = tokenize - - pre_sep = b"\xff\xff" - post_sep = b"" - - if not os.path.exists(save_dir): - os.mkdir(save_dir) - - fout = open(os.path.join(save_dir, dataset_name), "wb") - - with mp.get_context("fork").Pool(num_threads) as p: - loaded_size = [0] - docs_content_text = pq_df[content_col].tolist() - encoded_docs = p.map(encode, docs_content_text) - - for doc in encoded_docs: - next_line = sep() + doc - fout.write(next_line) - loaded_size.append(loaded_size[-1] + len(next_line)) - fout.close() - open(os.path.join(save_dir, dataset_name + ".size"), "wb").write(np.array(loaded_size, dtype=np.uint64).tobytes()) - ### To avoid tokenizing again we pass the tokenized column to use later - # return enc_text, loaded_size - - def load_pq_docs_once_avoidIO(pq_df, content_col, save_dir, dataset_name, tokenize, num_threads): global args_tokenize, encoded_docs, loaded_size args_tokenize = tokenize @@ -158,31 +106,6 @@ def load_pq_docs_once_avoidIO(pq_df, content_col, save_dir, dataset_name, tokeni # return enc_text, loaded_size -def gen_output_doc(args): - global remove_ex, args_tokenize - - this_idx, row = args - - if this_idx in remove_ex: - if args_tokenize: - row = encode(row) - for start, end in remove_ex[this_idx][::-1]: - if start % 2: - start = start - 1 - if end % 2: - end = end + 1 - # print(start,end) - # end = int(end-6) - # print(start,end) - row = row[:start] + row[end:] - row = decode(row) - else: - for start, end in remove_ex[this_idx][::-1]: - # print(start,end) - row = row[:start] + row[end:] - return row - - def gen_output_doc_once(args): global remove_ex, args_tokenize, encoded_docs @@ -208,27 +131,6 @@ def gen_output_doc_once(args): return row -def save_deduped_pq(pq_df, output_dir, content_col, num_threads, tokenize): - global args_tokenize, remove_ex - args_tokenize = tokenize - - # pq_df = pd.read_parquet(input_pq_list) - pre_content_col_size = sum(pq_df[content_col].str.len()) - - ### Removing the repeated subsequences from all parquet docs - docs = [(i, row) for i, row in enumerate(pq_df[content_col])] - p = mp.get_context("fork").Pool(int(num_threads)) - docs = p.map(gen_output_doc, docs) - - pq_df[content_col] = docs - deduped_content_col_size = sum(pq_df[content_col].str.len()) - - #### saving the output parquet file once - pq_df.to_parquet(output_dir) - - return pre_content_col_size, deduped_content_col_size - - def save_deduped_pq_once(pq_df, output_dir, content_col, num_threads, tokenize): global args_tokenize, remove_ex args_tokenize = tokenize @@ -251,96 +153,6 @@ def save_deduped_pq_once(pq_df, output_dir, content_col, num_threads, tokenize): return pre_content_col_size, deduped_content_col_size -def extract_dup_per_doc(size_file, repeated_pairs): - global remove_ex - remove = [] - fin = open(repeated_pairs) - for line in fin: - if 'out' in line: break - for line in fin: - remove.append(list(map(int, line.split()))) - - sizes = np.frombuffer(open(size_file, "rb").read(), dtype=np.uint64) - - remove_ex = defaultdict(list) - - # count_between_docs = 0 - # duplicate_between_docs = [] ### for printing and investigation - ptr = 0 - for i, byte_start in enumerate(sizes[:-1]): - byte_end = sizes[i + 1] - # print(byte_start, byte_end, remove[ptr]) - while ptr < len(remove) and byte_start <= remove[ptr][0] < byte_end: - # print(remove[ptr]) - - ##### if a duplicate is made from two subsequent documents, - ##### Do not remove it as each part might be the only occurrence in its related doc - ##### This follows our strategy to retain the first occurrence of each duplicate - if remove[ptr][1] > byte_end + 6: - # count_between_docs += 1 - # duplicate_between_docs.append(i) ### for printing and investigation - ptr += 1 - continue ### Do not remove this duplicate - - # The magic value 6 here corresponds to the 4-byte index prefix followed by \xff\xff. - remove_ex[i].append((max(int(remove[ptr][0] - byte_start - 6), 0), - int(min(int(remove[ptr][1] - byte_start), - byte_end - byte_start)) - 6)) ################## added -6 to exclude sep - ptr += 1 - # print ('############# Number of duplicate made from two subsequent documents: ', count_between_docs) - # print ('############# Number of duplicate made from two subsequent documents: ', duplicate_between_docs) - - # df_dict = pd.DataFrame(remove_ex) - # print(remove_ex) - # return remove_ex - - -def extract_dup_per_doc_avoidIO(repeated_pairs): - global remove_ex, loaded_size - remove = [] - fin = open(repeated_pairs) - for line in fin: - if 'out' in line: break - for line in fin: - remove.append(list(map(int, line.split()))) - - ### Avoid I/O process for .size file to speed up the process - # sizes = np.frombuffer(open(size_file, "rb").read(), dtype=np.uint64) - sizes = loaded_size - - remove_ex = defaultdict(list) - - # count_between_docs = 0 - # duplicate_between_docs = [] ### for printing and investigation - ptr = 0 - for i, byte_start in enumerate(sizes[:-1]): - byte_end = sizes[i + 1] - # print(byte_start, byte_end, remove[ptr]) - while ptr < len(remove) and byte_start <= remove[ptr][0] < byte_end: - # print(remove[ptr]) - - ##### if a duplicate is made from two subsequent documents, - ##### Do not remove it as each part might be the only occurrence in its related doc - ##### This follows our strategy to retain the first occurrence of each duplicate - if remove[ptr][1] > byte_end + 6: - # count_between_docs += 1 - # duplicate_between_docs.append(i) ### for printing and investigation - ptr += 1 - continue ### Do not remove this duplicate - - # The magic value 6 here corresponds to the 4-byte index prefix followed by \xff\xff. - remove_ex[i].append((max(int(remove[ptr][0] - byte_start - 6), 0), - int(min(int(remove[ptr][1] - byte_start), - byte_end - byte_start)) - 6)) ################## added -6 to exclude sep - ptr += 1 - # print ('############# Number of duplicate made from two subsequent documents: ', count_between_docs) - # print ('############# Number of duplicate made from two subsequent documents: ', duplicate_between_docs) - - # df_dict = pd.DataFrame(remove_ex) - # print(remove_ex) - # return remove_ex - - def extract_dup_per_doc_avoidIO_further(repeated_pairs): global remove_ex, loaded_size remove = [] diff --git a/transforms/universal/rep_removal/dpk_rep_removal/make_suffix_array.py b/transforms/universal/rep_removal/dpk_rep_removal/make_suffix_array.py index 809d6d499..95c5799aa 100644 --- a/transforms/universal/rep_removal/dpk_rep_removal/make_suffix_array.py +++ b/transforms/universal/rep_removal/dpk_rep_removal/make_suffix_array.py @@ -31,14 +31,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import os import time import subprocess import numpy as np from dpk_rep_removal.utils import calculate_timeout - -logging.basicConfig(level=logging.DEBUG) +from data_processing.utils import get_logger +logging = get_logger(__name__, level="INFO") def make_suffix_array(input, tmp_dir_sub, dedup_level, num_threads, num_cpus): diff --git a/transforms/universal/rep_removal/dpk_rep_removal/runtime.py b/transforms/universal/rep_removal/dpk_rep_removal/runtime.py index e77a53d22..00f0fd193 100644 --- a/transforms/universal/rep_removal/dpk_rep_removal/runtime.py +++ b/transforms/universal/rep_removal/dpk_rep_removal/runtime.py @@ -85,7 +85,7 @@ def add_input_params(self, parser: ArgumentParser) -> None: "--rep_removal_num_threads", type=str, required=False, - default="4", + default=str(cpu_count(logical=False)), help="Value for number of threads to use for processing", ) parser.add_argument( diff --git a/transforms/universal/rep_removal/dpk_rep_removal/transform.py b/transforms/universal/rep_removal/dpk_rep_removal/transform.py index eb51b3f06..f67737764 100644 --- a/transforms/universal/rep_removal/dpk_rep_removal/transform.py +++ b/transforms/universal/rep_removal/dpk_rep_removal/transform.py @@ -9,8 +9,9 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import logging + import os +import subprocess import tempfile import datetime import pyarrow as pa @@ -21,8 +22,8 @@ from psutil import cpu_count from dpk_rep_removal.make_suffix_array import make_suffix_array from data_processing.transform import AbstractTableTransform - -logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO) +from data_processing.utils import get_logger +logging = get_logger(__name__, level="INFO") class RepRemovalTransform(AbstractTableTransform): @@ -35,7 +36,7 @@ def __init__(self, config: dict[str, Any]): self.frequency_threshold = config.get("rep_removal_frequency_threshold", str(1)) self.retain_first_copy = str(config.get("rep_removal_retain_first_copy", True)) self.tokenize = str(config.get("rep_removal_tokenize", True)) - self.num_threads = config.get("rep_removal_num_threads", str(4)) + self.num_threads = config.get("rep_removal_num_threads", str(cpu_count(logical=False))) self.num_cpus = config.get("rep_removal_num_cpus", cpu_count(logical=False)) if self.retain_first_copy.lower() == 'false': @@ -44,13 +45,18 @@ def __init__(self, config: dict[str, Any]): else: self.retain_first_copy = True + pwd = os.path.dirname(__file__) + manifest_path = f"{pwd}/rust/" + cmd = f"cargo install --path {manifest_path}" + subprocess.run(cmd, shell=True, capture_output=True, text=True) + def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]: """ """ pq_df = table.to_pandas() try: with tempfile.TemporaryDirectory() as td: save_dir = os.path.join(td, 'save_dir') - logging.info(f"{datetime.datetime.now()}: encoding parquet") + logging.info("encoding parquet") encoded_pq = os.path.join(save_dir, self.dedup_level) load_pq_docs_once_avoidIO(pq_df, self.contents_column_name, save_dir, self.dedup_level, self.tokenize, int(self.num_threads)) @@ -60,16 +66,16 @@ def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Tab os.makedirs(cache_dir) os.makedirs(temp_dir) - logging.info(f"{datetime.datetime.now()}: making suffix array") + logging.info("making suffix array") make_suffix_array(encoded_pq, temp_dir, self.dedup_level, int(self.num_threads), int(self.num_cpus)) - logging.info(f"{datetime.datetime.now()}: finding repeated substrings") + logging.info("finding repeated substrings") find_repeated_substrings(encoded_pq, self.length_thresh, cache_dir, self.num_threads, self.frequency_threshold, self.retain_first_copy) - logging.info(f"{datetime.datetime.now()}: collecting duplicates") + logging.info("collecting duplicates") repeated_pairs = collect_duplicates_avoidIO(encoded_pq, self.length_thresh, cache_dir) # no duplicates found - if repeated_pairs[0] == 'S 0': + if 'out' not in repeated_pairs: return [], {"duplicates_found": 0} extract_dup_per_doc_avoidIO_further(repeated_pairs) @@ -78,11 +84,13 @@ def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Tab self.contents_column_name, self.num_threads, self.tokenize) - logging.info(f"Num Duplicate Rows: {len(repeated_pairs) - 4}") + + duplicates_found = len(repeated_pairs[repeated_pairs.index('out') + 1:-1]) + logging.info(f"Num Duplicate Rows: {duplicates_found}") metadata = { "pre_content col size": pre_content_col_size, "rep_removed_content col size": deduped_content_col_size, - "duplicates_found": len(repeated_pairs) - 4, + "duplicates_found": duplicates_found, } # add deduped to res table From 46097d5cbd33288ebe91e8d1b2f6f7abcd9198b6 Mon Sep 17 00:00:00 2001 From: Shalisha Witherspoon Date: Thu, 6 Feb 2025 14:49:11 -0500 Subject: [PATCH 4/6] updated per PR comments --- .../rep_removal/dpk_rep_removal/dedup_pq_level.py | 12 ------------ .../rep_removal/dpk_rep_removal/runtime.py | 14 +++++++------- .../rep_removal/dpk_rep_removal/transform.py | 9 ++++----- transforms/universal/rep_removal/rep_removal.ipynb | 12 +----------- 4 files changed, 12 insertions(+), 35 deletions(-) diff --git a/transforms/universal/rep_removal/dpk_rep_removal/dedup_pq_level.py b/transforms/universal/rep_removal/dpk_rep_removal/dedup_pq_level.py index 35dca2b7e..a6d15d6ad 100644 --- a/transforms/universal/rep_removal/dpk_rep_removal/dedup_pq_level.py +++ b/transforms/universal/rep_removal/dpk_rep_removal/dedup_pq_level.py @@ -24,12 +24,9 @@ import pandas as pd import struct from collections import defaultdict -import dpk_rep_removal.utils import transformers from transformers import GPT2Tokenizer -run_in_OCP = True - #### Save the tokenizer in a local path to speed up the process #### Get tokenizer from the local path to speed up the process @@ -81,9 +78,6 @@ def load_pq_docs_once_avoidIO(pq_df, content_col, save_dir, dataset_name, tokeni global args_tokenize, encoded_docs, loaded_size args_tokenize = tokenize - pre_sep = b"\xff\xff" - post_sep = b"" - if not os.path.exists(save_dir): os.mkdir(save_dir) @@ -207,9 +201,3 @@ def extract_dup_per_doc_avoidIO_further(repeated_pairs): int(min(int(remove[ptr][1] - byte_start), byte_end - byte_start)) - 6)) ################## added -6 to exclude sep ptr += 1 - # print ('############# Number of duplicate made from two subsequent documents: ', count_between_docs) - # print ('############# Number of duplicate made from two subsequent documents: ', duplicate_between_docs) - - # df_dict = pd.DataFrame(remove_ex) - # print(remove_ex) - # return remove_ex \ No newline at end of file diff --git a/transforms/universal/rep_removal/dpk_rep_removal/runtime.py b/transforms/universal/rep_removal/dpk_rep_removal/runtime.py index 00f0fd193..da7a71497 100644 --- a/transforms/universal/rep_removal/dpk_rep_removal/runtime.py +++ b/transforms/universal/rep_removal/dpk_rep_removal/runtime.py @@ -55,16 +55,16 @@ def add_input_params(self, parser: ArgumentParser) -> None: ) parser.add_argument( "--rep_removal_length_thresh", - type=str, + type=int, required=False, - default="50", + default=50, help="Length threshold for processing", ) parser.add_argument( "--rep_removal_frequency_threshold", - type=str, + type=int, required=False, - default="1", + default=1, help="Frequency threshold for processing.", ) parser.add_argument( @@ -83,14 +83,14 @@ def add_input_params(self, parser: ArgumentParser) -> None: ) parser.add_argument( "--rep_removal_num_threads", - type=str, + type=int, required=False, - default=str(cpu_count(logical=False)), + default=cpu_count(logical=False), help="Value for number of threads to use for processing", ) parser.add_argument( "--rep_removal_num_cpus", - type=str, + type=int, required=False, default=cpu_count(logical=False), help="Value for number of cpus allocated for processing", diff --git a/transforms/universal/rep_removal/dpk_rep_removal/transform.py b/transforms/universal/rep_removal/dpk_rep_removal/transform.py index f67737764..a1118814f 100644 --- a/transforms/universal/rep_removal/dpk_rep_removal/transform.py +++ b/transforms/universal/rep_removal/dpk_rep_removal/transform.py @@ -13,7 +13,6 @@ import os import subprocess import tempfile -import datetime import pyarrow as pa import pandas as pd from dpk_rep_removal.dedup_pq_level import load_pq_docs_once_avoidIO, extract_dup_per_doc_avoidIO_further, save_deduped_pq_once @@ -32,12 +31,12 @@ def __init__(self, config: dict[str, Any]): self.contents_column_name = config.get("rep_removal_contents_column_name", "contents") self.dedup_level = config.get("rep_removal_dedup_level_name", "parquet") - self.length_thresh = config.get("rep_removal_length_thresh", str(50)) - self.frequency_threshold = config.get("rep_removal_frequency_threshold", str(1)) + self.length_thresh = str(config.get("rep_removal_length_thresh", 5)) + self.frequency_threshold = str(config.get("rep_removal_frequency_threshold", 1)) self.retain_first_copy = str(config.get("rep_removal_retain_first_copy", True)) self.tokenize = str(config.get("rep_removal_tokenize", True)) - self.num_threads = config.get("rep_removal_num_threads", str(cpu_count(logical=False))) - self.num_cpus = config.get("rep_removal_num_cpus", cpu_count(logical=False)) + self.num_threads = str(config.get("rep_removal_num_threads", cpu_count(logical=False))) + self.num_cpus = str(config.get("rep_removal_num_cpus", cpu_count(logical=False))) if self.retain_first_copy.lower() == 'false': self.retain_first_copy = False diff --git a/transforms/universal/rep_removal/rep_removal.ipynb b/transforms/universal/rep_removal/rep_removal.ipynb index 1c529765b..afaacff50 100644 --- a/transforms/universal/rep_removal/rep_removal.ipynb +++ b/transforms/universal/rep_removal/rep_removal.ipynb @@ -38,16 +38,6 @@ "set the $PATH to include `/Users/USERNAME/.cargo/bin/`" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "88527732-fcaf-4fac-9120-43c67dba76d3", - "metadata": {}, - "outputs": [], - "source": [ - "import os" - ] - }, { "cell_type": "code", "execution_count": null, @@ -66,6 +56,7 @@ "outputs": [], "source": [ "# set $PATH env to append the rust path\n", + "import os\n", "os.environ['PATH'] = os.environ['PATH'] + ':/OUTPUT/OF/WHEREIS/CARGO/UP/TO/BIN/'" ] }, @@ -105,7 +96,6 @@ "RepRemoval(input_folder= \"test-data/input\",\n", " output_folder= \"test-data/output\",\n", " rep_removal_contents_column_name='text', \n", - " rep_removal_num_threads='1',\n", " ).transform()" ] }, From f49d3e1cccf7efe68697b1e682c279139da5686d Mon Sep 17 00:00:00 2001 From: Shalisha Witherspoon Date: Fri, 7 Feb 2025 11:41:33 -0500 Subject: [PATCH 5/6] updated make_suffix_array for threads and readibility, updated notebook to simplify adding cargo path, and removed 1 thread default in notebooks and tests --- transforms/universal/rep_removal/Makefile | 3 +- transforms/universal/rep_removal/README.md | 21 +- .../dpk_rep_removal/make_suffix_array.py | 246 +++++++++--------- .../universal/rep_removal/rep_removal.ipynb | 26 +- 4 files changed, 141 insertions(+), 155 deletions(-) diff --git a/transforms/universal/rep_removal/Makefile b/transforms/universal/rep_removal/Makefile index 5740780dd..5c110983b 100644 --- a/transforms/universal/rep_removal/Makefile +++ b/transforms/universal/rep_removal/Makefile @@ -20,5 +20,4 @@ run-cli-sample: source venv/bin/activate && \ $(PYTHON) -m dpk_$(TRANSFORM_NAME).runtime \ --data_local_config "{ 'input_folder' : 'test-data/input', 'output_folder' : 'output'}" \ - --rep_removal_contents_column_name 'text' \ - --rep_removal_num_threads '1' + --rep_removal_contents_column_name 'text' diff --git a/transforms/universal/rep_removal/README.md b/transforms/universal/rep_removal/README.md index 3b2384cae..0b28f42df 100644 --- a/transforms/universal/rep_removal/README.md +++ b/transforms/universal/rep_removal/README.md @@ -75,15 +75,15 @@ pip install --no-binary :all: psutil The transform can be initialized with the following parameters: -| Parameter | Default | Description | -|------------------------------------|------------|---------------------------------------------------| -| `rep_removal_contents_column_name` | `contents` | Name of the column holding the document contents | -| `rep_removal_dedup_level_name` | `parquet` | Name of the type of file to process | -| `rep_remova_length_thresh` | `50` | Length threshold for processing | -| `rep_removal_frequency_threshold` | `1` | Frequency threshold for processing | -| `rep_removal_retain_first_copy` | `True` | Boolean value for whether to retain first copy | -| `rep_removal_tokenize` | `True` | Boolean value for whether to tokenize | -| `rep_removal_num_threads` | `4` | Value for number of threads to use for processing | +| Parameter | Default | Description | +|------------------------------------|------------------------------------|---------------------------------------------------| +| `rep_removal_contents_column_name` | `contents` | Name of the column holding the document contents | +| `rep_removal_dedup_level_name` | `parquet` | Name of the type of file to process | +| `rep_remova_length_thresh` | `50` | Length threshold for processing | +| `rep_removal_frequency_threshold` | `1` | Frequency threshold for processing | +| `rep_removal_retain_first_copy` | `True` | Boolean value for whether to retain first copy | +| `rep_removal_tokenize` | `True` | Boolean value for whether to tokenize | +| `rep_removal_num_threads` | `psutils.cpu_count(logical=False)` | Value for number of threads to use for processing | ## Output Format @@ -125,8 +125,7 @@ You can invoke the transform via command line, as shown in sample make command ` ```commandline python -m dpk_rep_removal.runtime \ --data_local_config "{ 'input_folder' : 'test-data/input', 'output_folder' : 'output'}" \ - --rep_removal_contents_column_name 'text' \ - --rep_removal_num_threads '1' + --rep_removal_contents_column_name 'text' ``` diff --git a/transforms/universal/rep_removal/dpk_rep_removal/make_suffix_array.py b/transforms/universal/rep_removal/dpk_rep_removal/make_suffix_array.py index 95c5799aa..7224612e0 100644 --- a/transforms/universal/rep_removal/dpk_rep_removal/make_suffix_array.py +++ b/transforms/universal/rep_removal/dpk_rep_removal/make_suffix_array.py @@ -35,141 +35,143 @@ import time import subprocess import numpy as np +import multiprocessing as mp from dpk_rep_removal.utils import calculate_timeout from data_processing.utils import get_logger -logging = get_logger(__name__, level="INFO") +logger = get_logger(__name__, level="INFO") +pwd = os.path.dirname(__file__) +dedup_program = f"{pwd}/rust/target/release/dedup_dataset" -def make_suffix_array(input, tmp_dir_sub, dedup_level, num_threads, num_cpus): - # data_size = os.path.getsize(sys.argv[1]) - data_size = os.path.getsize(input) - - HACK = 100000 - - started = [] +# Determine the number of jobs based on the data size (total jobs, and jobs at once) +def determine_job_parameters(data_size): if data_size > 10e9: - total_jobs = 100 - jobs_at_once = 20 + return 100, 20 elif data_size > 1e9: - total_jobs = 96 - jobs_at_once = 96 + return 96, 96 elif data_size > 10e6: - total_jobs = 4 - jobs_at_once = 4 + return 4, 4 + else: + return 1, 1 + + +# Run a subprocess command and return the output +def run_subprocess(cmd, timeout=None): + try: + if timeout is None: + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=True) + stdout, stderr = process.communicate() + else: + process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=True, timeout=timeout) + stderr = process.stderr + stdout = process.stdout + + if process.returncode != 0: + raise Exception(f"Error in subprocess: {stderr}") + return stdout + except Exception as e: + logger.error(f"Error running command '{cmd}': {e}") + return None + + +# Create parts of the dataset +def create_part(data_file, start_byte, end_byte): + cmd = f"{dedup_program} make-part --data-file {data_file} --start-byte {start_byte} --end-byte {end_byte}" + logger.info(f"Creating part: {start_byte}-{end_byte}") + return run_subprocess(cmd) + + +# Calculate expected size using FACT +def get_expected_size(file_path): + size_data = os.path.getsize(file_path) + FACT = np.ceil(np.log(size_data) / np.log(2) / 8) + return size_data * FACT + + +# Check the integrity of the files +def check_file_integrity(data_file, started): + logger.info("Checking file integrity...") + while True: + files = [f"{data_file}.part.{s}-{e}" for s, e in started] + wait = [] + + for file, (s, e) in zip(files, started): + if not os.path.exists(file) or not os.path.exists(f"{file}.table.bin") or os.path.getsize( + f"{file}.table.bin") == 0 or get_expected_size(file) != os.path.getsize(file + ".table.bin"): + logger.warning(f"File missing or invalid: {file}, rerunning.") + wait.append((s, e)) + + if not wait: + break + + logger.info(f"Re-running {len(wait)} jobs due to failed integrity checks.") + with mp.Pool(len(wait)) as pool: + pool.starmap(create_part, [(data_file, s, e) for s, e in wait]) + + time.sleep(1) + + +# Merge the suffix trees +def merge_suffix_trees(files, suffix_array_path, threads, timeout=None): + cmd = f"{dedup_program} merge --output-file {suffix_array_path} --suffix-path {' --suffix-path '.join(files)} --num-threads {threads}" + logger.info("Merging suffix trees...") + result = run_subprocess(cmd, timeout) + if result: + logger.info("Merge successful.") + else: + logger.error("Merge failed.") + raise RuntimeError("Merge failed.") + + +# Cleanup and verification of the final table file + +def cleanup_and_verify_final_table(input_file, suffix_array_path, tmp_dir_sub): + logger.info("Final cleanup and verification...") + subprocess.run("cat %s.table.bin.* > %s/out.table.bin" % (suffix_array_path, tmp_dir_sub), shell=True) + subprocess.run("mv %s/out.table.bin %s.table.bin" % (tmp_dir_sub, input_file), shell=True) + # Verify file integrity + if os.path.exists(f"{input_file}.table.bin"): + if os.path.getsize(f"{input_file}.table.bin") % os.path.getsize(input_file) != 0: + logger.error("File size is incorrect.") + raise RuntimeError("File size is incorrect.") else: - total_jobs = 1 - jobs_at_once = 1 + logger.error("Failed to create the table file.") + raise RuntimeError("Failed to create the table file.") + + +def make_suffix_array(input, tmp_dir_sub, dedup_level, num_threads, num_cpus): + HACK = 100000 + data_size = os.path.getsize(input) + total_jobs, jobs_at_once = determine_job_parameters(data_size) + chunk_size = data_size // total_jobs + started = [] + logger.info(f"Starting the deduplication process for file: {input}") - S = data_size // total_jobs timeout = calculate_timeout(data_size, cpu_cores=num_cpus) - logging.info(f"timeout is: {timeout}") + logger.info(f"timeout is: {timeout}") - pwd = os.path.dirname(__file__) - dedup_program = f"{pwd}/rust/target/release/dedup_dataset" + # Create dataset parts in parallel + for jobstart in range(0, total_jobs, jobs_at_once): + wait = [] + for i in range(jobstart, jobstart + jobs_at_once): + start_byte, end_byte = i * chunk_size, min((i + 1) * chunk_size + HACK, data_size) + started.append((start_byte, end_byte)) + wait.append((start_byte, end_byte)) - try: - for jobstart in range(0, total_jobs, jobs_at_once): - wait = [] - for i in range(jobstart, jobstart + jobs_at_once): - s, e = i * S, min((i + 1) * S + HACK, data_size) - # cmd = "./target/debug/dedup_dataset make-part --data-file %s --start-byte %d --end-byte %d"%(sys.argv[1], s, e) - - ########################################################################################################################################### - # cmd = "./target/debug/dedup_dataset make-part --data-file %s --start-byte %d --end-byte %d"%(input, s, e) - cmd = f"{dedup_program}" + " make-part --data-file %s --start-byte %d --end-byte %d" % (input, s, e) - ########################################################################################################################################### - - started.append((s, e)) - #run the command with subprocess and capture the output - result = subprocess.run(cmd, shell=True, capture_output=True, text=True) - wait.append(result) - - if e == data_size: - break - - #Ensure all commands have finished - for result in wait: - if result.returncode != 0: - raise RuntimeError(f"Error occurred: {result.stderr}") - - # check the output of part files and rerun if necessary - while True: - # files = ["%s.part.%d-%d"%(sys.argv[1],s, e) for s,e in started] - files = ["%s.part.%d-%d" % (input, s, e) for s, e in started] - - wait = [] - for x, (s, e) in zip(files, started): - go = False - if not os.path.exists(x): - go = True - else: - size_data = os.path.getsize(x) - FACT = np.ceil(np.log(size_data) / np.log(2) / 8) - if not os.path.exists(x) or not os.path.exists(x + ".table.bin") or os.path.getsize( - x + ".table.bin") == 0 or size_data * FACT != os.path.getsize(x + ".table.bin"): - go = True - if go: - # cmd = "./target/debug/dedup_dataset make-part --data-file %s --start-byte %d --end-byte %d"%(sys.argv[1], s, e) - ########################################################################################################################################### - # cmd = "./target/debug/dedup_dataset make-part --data-file %s --start-byte %d --end-byte %d"%(input, s, e) - cmd = f"{dedup_program}" + " make-part --data-file %s --start-byte %d --end-byte %d" % (input, s, e) - ########################################################################################################################################### - - # run the command to recreate the missing or failed parts - result = subprocess.run(cmd, shell=True, capture_output=True, text=True) - wait.append(result) - if len(wait) >= jobs_at_once: - break - - # Ensure all commands have finished - for result in wait: - if result.returncode != 0: - raise RuntimeError(f"Error occurred: {result.stderr}") - - time.sleep(1) - # break the loop when no jobs are left - if len(wait) == 0: - break - - #os.popen("rm tmp/out.table.bin.*").read() - - torun = " --suffix-path ".join(files) - # pipe = os.popen("./target/debug/dedup_dataset merge --output-file %s --suffix-path %s --num-threads %d"%("tmp/out.table.bin", torun, num_threads)) - - #### Saving suffix arrays in a sub folder (part of the input file name is used for sub folder name) - #### to avoid conflicts in parallel processes on the same node - suffix_array_path = os.path.join(tmp_dir_sub, dedup_level) - - ########################################################################################################################################### - # pipe = os.popen("./target/debug/dedup_dataset merge --output-file %s --suffix-path %s --num-threads %d"%(suffix_array_path, torun,num_threads )) - cmd = f"{dedup_program}" + " merge --output-file %s --suffix-path %s --num-threads %d" % ( - suffix_array_path, torun, num_threads) - ########################################################################################################################################### - - # run the merge command: - logging.info("running the merge") - result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout) - if result.returncode != 0: - raise RuntimeError("Something went wrong with merging.") - - #### Saving suffix arrays in a sub folder (part of the input file name is used for sub folder name) - #### to avoid conflicts in parallel processes on the same node - subprocess.run("cat %s.table.bin.* > %s/out.table.bin" % (suffix_array_path, tmp_dir_sub), shell=True) - - subprocess.run("mv %s/out.table.bin %s.table.bin" % (tmp_dir_sub, input), shell=True) - - logging.info('merging complete') - # if os.path.exists(sys.argv[1]+".table.bin"): - if os.path.exists(input + ".table.bin"): - if os.path.getsize(input + ".table.bin") % os.path.getsize(input) != 0: - raise RuntimeError("File size is wrong") + logger.info(f"Scheduling {jobs_at_once} jobs to create dataset parts.") + with mp.Pool(jobs_at_once) as pool: + pool.starmap(create_part, [(input, s, e) for s, e in wait]) - else: - raise RuntimeError("Failed to create table") + # Check the integrity of all created parts + check_file_integrity(input, started) + + # Merging the parts into the final dataset + suffix_array_path = os.path.join(tmp_dir_sub, dedup_level) + files = [f"{input}.part.{s}-{e}" for s, e in started] + merge_suffix_trees(files, suffix_array_path, num_threads, timeout) - except subprocess.TimeoutExpired: - raise RuntimeError("subprocess timed out. skipping file") + # Final cleanup and verification + cleanup_and_verify_final_table(input, suffix_array_path, tmp_dir_sub) - except subprocess.CalledProcessError: - raise RuntimeError("error during subprocess call. skipping file") + logger.info("Deduplication process completed successfully.") diff --git a/transforms/universal/rep_removal/rep_removal.ipynb b/transforms/universal/rep_removal/rep_removal.ipynb index afaacff50..a6982fc44 100644 --- a/transforms/universal/rep_removal/rep_removal.ipynb +++ b/transforms/universal/rep_removal/rep_removal.ipynb @@ -28,14 +28,7 @@ "***Rust*** is required to be installed on the system locally in order to run. To install, review here: https://www.rust-lang.org/tools/install\n", "\n", "### Add Rust to $PATH\n", - "If Rust is **not** added to your `$PATH`, run the below steps to add the rust installation location for proper execution. \n", - "\n", - "You can use the `!whereis cargo` command to find where rust is installed in your machine, and **set the path there up to the `/bin`**\n", - "\n", - "ex: whereis cargo produces:\n", - "cargo: /Users/USERNAME/.cargo/bin/cargo\n", - "\n", - "set the $PATH to include `/Users/USERNAME/.cargo/bin/`" + "If Rust is **not** added to your `$PATH`, run the cell below to add the rust installation location for proper execution. \n" ] }, { @@ -45,19 +38,12 @@ "metadata": {}, "outputs": [], "source": [ - "!whereis cargo" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "eaed73c0-95b8-42de-9cac-a0cdf19ad35b", - "metadata": {}, - "outputs": [], - "source": [ - "# set $PATH env to append the rust path\n", + "import pathlib\n", "import os\n", - "os.environ['PATH'] = os.environ['PATH'] + ':/OUTPUT/OF/WHEREIS/CARGO/UP/TO/BIN/'" + "\n", + "result = !whereis cargo\n", + "cargo_path = os.path.join(pathlib.Path(result[0].split(' ')[1]).parent, '')\n", + "os.environ['PATH'] = os.environ['PATH'] + f':{cargo_path}'" ] }, { From bfdea9bdf24fb4edff0a9daf4c47b57d79cb2ed6 Mon Sep 17 00:00:00 2001 From: Shalisha Witherspoon Date: Fri, 7 Feb 2025 12:03:22 -0500 Subject: [PATCH 6/6] removed thread setting in pytests --- .../universal/rep_removal/test/test_rep_removal_python.py | 3 --- transforms/universal/rep_removal/test/test_rep_removal_ray.py | 1 - 2 files changed, 4 deletions(-) diff --git a/transforms/universal/rep_removal/test/test_rep_removal_python.py b/transforms/universal/rep_removal/test/test_rep_removal_python.py index 8936c35fe..1e6766a3e 100644 --- a/transforms/universal/rep_removal/test/test_rep_removal_python.py +++ b/transforms/universal/rep_removal/test/test_rep_removal_python.py @@ -24,7 +24,6 @@ def test_rep_removal(self): RepRemoval(input_folder=basedir + "/input", output_folder=basedir + "/output", rep_removal_contents_column_name='text', - rep_removal_num_threads='1', ).transform() table1 = pq.read_table(os.path.join(basedir, 'expected', 'test1.parquet')) @@ -36,7 +35,6 @@ def test_wrong_contents_field(self): RepRemoval(input_folder=basedir + "/input", output_folder=basedir + "/output", rep_removal_contents_column_name='contents', - rep_removal_num_threads='1', ).transform() with open(os.path.join(basedir, 'output', 'metadata.json'), 'r') as f: @@ -47,7 +45,6 @@ def test_remove_first_copy(self): RepRemoval(input_folder=basedir + "/input", output_folder=basedir + "/output", rep_removal_contents_column_name='text', - rep_removal_num_threads='1', rep_removal_retain_first_copy=False, ).transform() diff --git a/transforms/universal/rep_removal/test/test_rep_removal_ray.py b/transforms/universal/rep_removal/test/test_rep_removal_ray.py index 600f213cb..56449c455 100644 --- a/transforms/universal/rep_removal/test/test_rep_removal_ray.py +++ b/transforms/universal/rep_removal/test/test_rep_removal_ray.py @@ -28,7 +28,6 @@ def get_test_transform_fixtures(self) -> list[tuple]: transform_config = { "run_locally": True, "rep_removal_contents_column_name": 'text', - "rep_removal_num_threads": '1', } launcher = RayTransformLauncher(RepRemovalRayTransformConfiguration())