Skip to content

Commit

Permalink
Merge pull request #1016 from swith005/cargo_fix
Browse files Browse the repository at this point in the history
Cargo fix
  • Loading branch information
touma-I authored Feb 10, 2025
2 parents 2dca254 + bfdea9b commit e3ce06e
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 400 deletions.
3 changes: 1 addition & 2 deletions transforms/universal/rep_removal/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,4 @@ run-cli-sample:
source venv/bin/activate && \
$(PYTHON) -m dpk_$(TRANSFORM_NAME).runtime \
--data_local_config "{ 'input_folder' : 'test-data/input', 'output_folder' : 'output'}" \
--rep_removal_contents_column_name 'text' \
--rep_removal_num_threads '1'
--rep_removal_contents_column_name 'text'
50 changes: 29 additions & 21 deletions transforms/universal/rep_removal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,38 @@ pip install --no-binary :all: psutil

```

B) Compile the dedup_dataset binary from the **dpk_rep_removal** package dir:
- Install from git clone repo:
```shell
cargo install --path dpk_rep_removal/rust
```
- Install from pip install (Note: Activate venv before running next commands):
```shell
PACKAGE_LOCATION=$(pip show data_prep_toolkit_transforms | grep Location | awk '{print $2}')
cargo install --path $PACKAGE_LOCATION/dpk_rep_removal/rust
```
[//]: # (B) Compile the dedup_dataset binary from the **dpk_rep_removal** package dir:)

[//]: # (- Install from git clone repo:)

[//]: # (```shell)

[//]: # (cargo install --path dpk_rep_removal/rust)

[//]: # (```)

[//]: # (- Install from pip install (Note: Activate venv before running next commands):)

[//]: # (```shell)

[//]: # (PACKAGE_LOCATION=$(pip show data_prep_toolkit_transforms | grep Location | awk '{print $2}'))

[//]: # (cargo install --path $PACKAGE_LOCATION/dpk_rep_removal/rust)

[//]: # (```)
## Input Parameters

The transform can be initialized with the following parameters:

| Parameter | Default | Description |
|------------------------------------|------------|---------------------------------------------------|
| `rep_removal_contents_column_name` | `contents` | Name of the column holding the document contents |
| `rep_removal_dedup_level_name` | `parquet` | Name of the type of file to process |
| `rep_remova_length_thresh` | `50` | Length threshold for processing |
| `rep_removal_frequency_threshold` | `1` | Frequency threshold for processing |
| `rep_removal_retain_first_copy` | `True` | Boolean value for whether to retain first copy |
| `rep_removal_tokenize` | `True` | Boolean value for whether to tokenize |
| `rep_removal_num_threads` | `4` | Value for number of threads to use for processing |
| Parameter | Default | Description |
|------------------------------------|------------------------------------|---------------------------------------------------|
| `rep_removal_contents_column_name` | `contents` | Name of the column holding the document contents |
| `rep_removal_dedup_level_name` | `parquet` | Name of the type of file to process |
| `rep_remova_length_thresh` | `50` | Length threshold for processing |
| `rep_removal_frequency_threshold` | `1` | Frequency threshold for processing |
| `rep_removal_retain_first_copy` | `True` | Boolean value for whether to retain first copy |
| `rep_removal_tokenize` | `True` | Boolean value for whether to tokenize |
| `rep_removal_num_threads` | `psutils.cpu_count(logical=False)` | Value for number of threads to use for processing |


## Output Format
Expand Down Expand Up @@ -116,8 +125,7 @@ You can invoke the transform via command line, as shown in sample make command `
```commandline
python -m dpk_rep_removal.runtime \
--data_local_config "{ 'input_folder' : 'test-data/input', 'output_folder' : 'output'}" \
--rep_removal_contents_column_name 'text' \
--rep_removal_num_threads '1'
--rep_removal_contents_column_name 'text'
```

Expand Down
200 changes: 0 additions & 200 deletions transforms/universal/rep_removal/dpk_rep_removal/dedup_pq_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@
import pandas as pd
import struct
from collections import defaultdict
import dpk_rep_removal.utils
import transformers
from transformers import GPT2Tokenizer

run_in_OCP = True

#### Save the tokenizer in a local path to speed up the process
#### Get tokenizer from the local path to speed up the process

Expand Down Expand Up @@ -77,65 +74,10 @@ def decode(x):
return out


def load_pq_docs(pq_df, content_col, save_dir, dataset_name, tokenize, num_threads):
global args_tokenize
args_tokenize = tokenize

pre_sep = b"\xff\xff"
post_sep = b""

if not os.path.exists(save_dir):
os.mkdir(save_dir)

fout = open(os.path.join(save_dir, dataset_name), "wb")

with mp.get_context("fork").Pool(num_threads) as p:
sizes = [0]
docs_content_text = pq_df[content_col].tolist()
encoded_docs = p.map(encode, docs_content_text)

for doc in encoded_docs:
next_line = sep() + doc
fout.write(next_line)
sizes.append(sizes[-1] + len(next_line))
fout.close()
open(os.path.join(save_dir, dataset_name + ".size"), "wb").write(np.array(sizes, dtype=np.uint64).tobytes())


def load_pq_docs_once(pq_df, content_col, save_dir, dataset_name, tokenize, num_threads):
global encoded_docs, loaded_size, args_tokenize
args_tokenize = tokenize

pre_sep = b"\xff\xff"
post_sep = b""

if not os.path.exists(save_dir):
os.mkdir(save_dir)

fout = open(os.path.join(save_dir, dataset_name), "wb")

with mp.get_context("fork").Pool(num_threads) as p:
loaded_size = [0]
docs_content_text = pq_df[content_col].tolist()
encoded_docs = p.map(encode, docs_content_text)

for doc in encoded_docs:
next_line = sep() + doc
fout.write(next_line)
loaded_size.append(loaded_size[-1] + len(next_line))
fout.close()
open(os.path.join(save_dir, dataset_name + ".size"), "wb").write(np.array(loaded_size, dtype=np.uint64).tobytes())
### To avoid tokenizing again we pass the tokenized column to use later
# return enc_text, loaded_size


def load_pq_docs_once_avoidIO(pq_df, content_col, save_dir, dataset_name, tokenize, num_threads):
global args_tokenize, encoded_docs, loaded_size
args_tokenize = tokenize

pre_sep = b"\xff\xff"
post_sep = b""

if not os.path.exists(save_dir):
os.mkdir(save_dir)

Expand All @@ -158,31 +100,6 @@ def load_pq_docs_once_avoidIO(pq_df, content_col, save_dir, dataset_name, tokeni
# return enc_text, loaded_size


def gen_output_doc(args):
global remove_ex, args_tokenize

this_idx, row = args

if this_idx in remove_ex:
if args_tokenize:
row = encode(row)
for start, end in remove_ex[this_idx][::-1]:
if start % 2:
start = start - 1
if end % 2:
end = end + 1
# print(start,end)
# end = int(end-6)
# print(start,end)
row = row[:start] + row[end:]
row = decode(row)
else:
for start, end in remove_ex[this_idx][::-1]:
# print(start,end)
row = row[:start] + row[end:]
return row


def gen_output_doc_once(args):
global remove_ex, args_tokenize, encoded_docs

Expand All @@ -208,27 +125,6 @@ def gen_output_doc_once(args):
return row


def save_deduped_pq(pq_df, output_dir, content_col, num_threads, tokenize):
global args_tokenize, remove_ex
args_tokenize = tokenize

# pq_df = pd.read_parquet(input_pq_list)
pre_content_col_size = sum(pq_df[content_col].str.len())

### Removing the repeated subsequences from all parquet docs
docs = [(i, row) for i, row in enumerate(pq_df[content_col])]
p = mp.get_context("fork").Pool(int(num_threads))
docs = p.map(gen_output_doc, docs)

pq_df[content_col] = docs
deduped_content_col_size = sum(pq_df[content_col].str.len())

#### saving the output parquet file once
pq_df.to_parquet(output_dir)

return pre_content_col_size, deduped_content_col_size


def save_deduped_pq_once(pq_df, output_dir, content_col, num_threads, tokenize):
global args_tokenize, remove_ex
args_tokenize = tokenize
Expand All @@ -251,96 +147,6 @@ def save_deduped_pq_once(pq_df, output_dir, content_col, num_threads, tokenize):
return pre_content_col_size, deduped_content_col_size


def extract_dup_per_doc(size_file, repeated_pairs):
global remove_ex
remove = []
fin = open(repeated_pairs)
for line in fin:
if 'out' in line: break
for line in fin:
remove.append(list(map(int, line.split())))

sizes = np.frombuffer(open(size_file, "rb").read(), dtype=np.uint64)

remove_ex = defaultdict(list)

# count_between_docs = 0
# duplicate_between_docs = [] ### for printing and investigation
ptr = 0
for i, byte_start in enumerate(sizes[:-1]):
byte_end = sizes[i + 1]
# print(byte_start, byte_end, remove[ptr])
while ptr < len(remove) and byte_start <= remove[ptr][0] < byte_end:
# print(remove[ptr])

##### if a duplicate is made from two subsequent documents,
##### Do not remove it as each part might be the only occurrence in its related doc
##### This follows our strategy to retain the first occurrence of each duplicate
if remove[ptr][1] > byte_end + 6:
# count_between_docs += 1
# duplicate_between_docs.append(i) ### for printing and investigation
ptr += 1
continue ### Do not remove this duplicate

# The magic value 6 here corresponds to the 4-byte index prefix followed by \xff\xff.
remove_ex[i].append((max(int(remove[ptr][0] - byte_start - 6), 0),
int(min(int(remove[ptr][1] - byte_start),
byte_end - byte_start)) - 6)) ################## added -6 to exclude sep
ptr += 1
# print ('############# Number of duplicate made from two subsequent documents: ', count_between_docs)
# print ('############# Number of duplicate made from two subsequent documents: ', duplicate_between_docs)

# df_dict = pd.DataFrame(remove_ex)
# print(remove_ex)
# return remove_ex


def extract_dup_per_doc_avoidIO(repeated_pairs):
global remove_ex, loaded_size
remove = []
fin = open(repeated_pairs)
for line in fin:
if 'out' in line: break
for line in fin:
remove.append(list(map(int, line.split())))

### Avoid I/O process for .size file to speed up the process
# sizes = np.frombuffer(open(size_file, "rb").read(), dtype=np.uint64)
sizes = loaded_size

remove_ex = defaultdict(list)

# count_between_docs = 0
# duplicate_between_docs = [] ### for printing and investigation
ptr = 0
for i, byte_start in enumerate(sizes[:-1]):
byte_end = sizes[i + 1]
# print(byte_start, byte_end, remove[ptr])
while ptr < len(remove) and byte_start <= remove[ptr][0] < byte_end:
# print(remove[ptr])

##### if a duplicate is made from two subsequent documents,
##### Do not remove it as each part might be the only occurrence in its related doc
##### This follows our strategy to retain the first occurrence of each duplicate
if remove[ptr][1] > byte_end + 6:
# count_between_docs += 1
# duplicate_between_docs.append(i) ### for printing and investigation
ptr += 1
continue ### Do not remove this duplicate

# The magic value 6 here corresponds to the 4-byte index prefix followed by \xff\xff.
remove_ex[i].append((max(int(remove[ptr][0] - byte_start - 6), 0),
int(min(int(remove[ptr][1] - byte_start),
byte_end - byte_start)) - 6)) ################## added -6 to exclude sep
ptr += 1
# print ('############# Number of duplicate made from two subsequent documents: ', count_between_docs)
# print ('############# Number of duplicate made from two subsequent documents: ', duplicate_between_docs)

# df_dict = pd.DataFrame(remove_ex)
# print(remove_ex)
# return remove_ex


def extract_dup_per_doc_avoidIO_further(repeated_pairs):
global remove_ex, loaded_size
remove = []
Expand Down Expand Up @@ -395,9 +201,3 @@ def extract_dup_per_doc_avoidIO_further(repeated_pairs):
int(min(int(remove[ptr][1] - byte_start),
byte_end - byte_start)) - 6)) ################## added -6 to exclude sep
ptr += 1
# print ('############# Number of duplicate made from two subsequent documents: ', count_between_docs)
# print ('############# Number of duplicate made from two subsequent documents: ', duplicate_between_docs)

# df_dict = pd.DataFrame(remove_ex)
# print(remove_ex)
# return remove_ex
Loading

0 comments on commit e3ce06e

Please sign in to comment.