Skip to content

Commit

Permalink
add redaction code
Browse files Browse the repository at this point in the history
  • Loading branch information
loubnabnl committed Aug 24, 2023
1 parent 5cd79f9 commit c04356b
Show file tree
Hide file tree
Showing 6 changed files with 531 additions and 7 deletions.
5 changes: 5 additions & 0 deletions pii/ner/pii_redaction/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# PII redaction

```bash
python main_redact.py --dataset_name /fsx/leandro/data/pii_result/ada --target_dataset ada-no-pii --save_path_disk ada-no-pii-local
```
288 changes: 288 additions & 0 deletions pii/ner/pii_redaction/main_redact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
"""Mask detected PII in a dataset.
"""

import argparse
import json
import logging
import random
import time
from functools import partial
from pprint import pformat

from datasets import load_dataset
from datasets.utils.logging import set_verbosity_info

from manual_sharding import save_manual_shards
from utils import get_replacements, redact_pii_batch


def parseArgs():
parser = argparse.ArgumentParser(description="PII detection and redaction")
parser.add_argument(
"--dataset_name",
default="bigcode/pii-for-code",
type=str,
help="HF repo name/path of the dataset.",
)
parser.add_argument(
"--num_load_proc",
default=64,
type=int,
help="Number of processes to use for loading the dataset",
)
parser.add_argument(
"--lang",
default="ada",
type=str,
help="Language to redact PII in.",
)
parser.add_argument(
"--text_column",
default="content",
type=str,
help="Text column to use, if will be renamed to content",
)
parser.add_argument(
"--split",
default="train",
type=str,
help="Dataset split to process",
)
parser.add_argument(
"--batch_size",
default=100,
type=int,
help="Batch size for the PII detection/redaction",
)
parser.add_argument(
"--seed",
default=0,
type=int,
help="Seed for random",
)
parser.add_argument(
"--num_proc",
default=96,
type=int,
help="Number of processes to use for the PII detection/redaction",
)
parser.add_argument(
"--no_redaction",
action="store_true",
help="If set, we don't perform redaction",
)
parser.add_argument(
"--load_replacements",
default=True,
help="If set, we load the replacements from file replacements.json",
)
parser.add_argument(
"--add_reference_text",
default=False,
type=bool,
help="If True we add the reference text with PII between delimiters \
in the redacted text -used for visualization-",
)
parser.add_argument(
"--check_all_files",
action="store_true",
help="If set, we check all files, not only the ones that contain PII",
)
parser.add_argument(
"--check_sampling_size",
default=0,
type=int,
help="Number of samples to check for PII",
)
# for saving the dataset: either push to HF or save locally with datasets or save manual shards
parser.add_argument(
"--save_mode",
default="manual_shards",
type=str,
choices=["hub", "local", "manual_shards"],
help="How to save the dataset",
)
parser.add_argument(
"--save_mode_checks",
default="hub",
type=str,
choices=["hub", "local", "manual_shards"],
help="How to save the checks dataset",
)
# add argument for name of dataset on the hub
parser.add_argument(
"--target_dataset",
default="bigcode-pii2",
type=str,
help="HF repo name of the target dataset in save_mode=hub.",
)
parser.add_argument(
"--hub_username",
default="loubnabnl",
type=str,
help="Username for the hub",
)
parser.add_argument(
"--save_path_disk",
default="bigcode-pii2-local",
type=str,
help="Path to save the dataset on disk in save_mode=local.",
)
return parser.parse_args()


def get_check_ds(ds, args):
if not args.check_all_files:
ds_checks = ds.filter(
lambda exs: exs["modified"],
batched=True,
batch_size=args.batch_size,
num_proc=args.num_proc,
)
else:
ds_checks = ds
if not args.check_sampling_size:
sampling_size = len(ds_checks)
idx_samples = random.sample(
range(len(ds_checks)), min(len(ds_checks), sampling_size)
)
ds_checks = ds_checks.select(idx_samples)

return ds_checks


def check_uniques(example, uniques):
"""Check if current id is still in set of unique id and remove if true."""
if example["id"] in uniques:
uniques.remove(example["id"])
return True
else:
return False


def main():
set_verbosity_info()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
level=logging.INFO,
handlers=[logging.FileHandler("pii.log"), logging.StreamHandler()],
)
args = parseArgs()
logger.info(
f"** The job is running with the following arguments: **\n{args}\n **** "
)

logger.info(f" ===== Loading {args.dataset_name} =====")
ds = load_dataset(
args.dataset_name,
split=args.split,
use_auth_token=True,
num_proc=args.num_load_proc,
)
if args.text_column != "content":
ds = ds.rename_column(args.text_column, "content")

logger.info(f" ===== Deduplicating dataset =====")
# Deduplication based on ids
uniques = set(ds["id"])
frac = len(uniques) / len(ds)
logger.info(f"Fraction of duplicates: {1-frac:.2%}")
logger.info(f"Dataset:\n{ds}")
# Deduplicate data and apply heuristics
t_start = time.time()
ds_pii = ds.filter(
check_uniques, fn_kwargs={"uniques": uniques}, num_proc=args.num_proc
)
logger.info(f"Time to filter dataset: {time.time()-t_start:.2f}")
logger.info(f"Dataset after dedup:\n{ds_pii}")

logger.info(
f"Number of samples that contained PII: {sum([1 if x['entities'] else 0 for x in ds_pii])}"
)
logger.info(
f"Total number of secrets found: {sum([len(x['entities']) for x in ds_pii])}"
)

# redact PII in the dataset
logger.info(f" ===== Applying PII redaction =====")
random.seed(args.seed)

replacements = get_replacements()
with open("replacements.json", "w") as f:
json.dump(replacements, f)
logging.info(f"Using the following replacements:\n{pformat(replacements)}")
ds_pii = ds_pii.map(
partial(
redact_pii_batch,
replacements=replacements,
add_references=args.add_reference_text,
),
batched=True,
batch_size=args.batch_size,
num_proc=args.num_proc,
load_from_cache_file=False,
)
logging.info(f"Dataset info after PII redaction:\n{ds_pii}")

# check the dataset
logger.info(
f" ===== Checking {args.check_sampling_size} samples from those modified in the dataset ====="
)
ds_checks = get_check_ds(ds_pii, args)

# save checks dataset
if len(ds_checks) == 0:
logger.info("Dataset was empty. Not saving anything.")
else:
logger.info(f"Checks dataset info {ds_checks}")
if args.save_mode_checks == "hub":
logger.info(
f"Pushing the checks dataset to the Hub as {args.target_dataset}_checks"
)
ds_checks.push_to_hub(args.target_dataset + "_checks")

elif args.save_mode_checks == "local":
logger.info(f"Saving the checks dataset to disk")
ds_checks.save_to_disk(args.save_path_disk + "_checks")

elif args.save_mode_checks == "manual_shards":
logger.info(f"Saving the checks dataset in manual shards")
save_manual_shards(
ds_checks,
user=args.hub_username,
remote_dataset_repo=args.target_dataset + "_checks",
)

logger.info("Removing columns that are not needed for the final dataset")
columns = ["content", "modified", "entities"]
if args.add_reference_text:
columns.append("references")
ds_pii = ds_pii.remove_columns(columns)
ds_pii = ds_pii.rename_column("new_content", "content")
logger.info(f"Dataset info after removing columns:\n{ds_pii}")

# save the final dataset
if args.save_mode == "hub":
logger.info(
f" ===== Pushing the dataset to the Hub as: {args.target_dataset} ====="
)
ds_pii.push_to_hub(args.target_dataset)

elif args.save_mode == "local":
logger.info(f" ===== Saving the dataset to disk =====")
ds_pii.save_to_disk(args.save_path_disk)

elif args.save_mode == "manual_shards":
logger.info(f" ===== Saving the dataset in manual shards =====")
save_manual_shards(
ds_pii, user=args.hub_username, remote_dataset_repo=args.target_dataset
)

logger.info(f" ===== Dataset saved successfully =====")


if __name__ == "__main__":
main()
54 changes: 54 additions & 0 deletions pii/ner/pii_redaction/manual_sharding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import os
import time
from multiprocessing import Pool
from tqdm import tqdm

from huggingface_hub import Repository


def save_shard(shard_tuple):
"""Save shard"""
filename, shard = shard_tuple
# use to_json instead to save as json file
shard.to_parquet(filename)

def save_manual_shards(ds, user="loubnabnl", remote_dataset_repo="bigcode-pii-pjj"):
"""Save sharded data
Args:
ds (Dataset): dataset to be saved
user (str): user name
remote_dataset_repo (str): remote dataset repository
out_path (str): path to save the shards"""
# this will create a folder OUT_PATH that is a clone of REMOTE_DATASET_REPO
# you can save the shards inside it and do git add/commit/push to push data to the hub
out_path = remote_dataset_repo
# if out path doesnt already exist
if not os.path.exists(out_path):
repo = Repository(
local_dir=out_path,
clone_from=user + "/" + remote_dataset_repo,
repo_type="dataset",
use_auth_token=True,
git_user=user
)

# files will be numerous we save them in a folder called data inside out_path
os.mkdir(out_path + "/data")
SHARD_SIZE = 1000 << 20
if ds._indices is not None:
dataset_nbytes = ds.data.nbytes * len(ds._indices) / len(ds.data)
else:
dataset_nbytes = ds.data.nbytes
num_shards = int(dataset_nbytes / SHARD_SIZE) + 1
print(f"Number of shards: {num_shards}")

print("sharding the dataset")
t_start = time.time()
shards = (ds.shard(num_shards=num_shards, index=i, contiguous=True) for i in range(num_shards))
# use f"{OUT_PATH}/data/train-{index:05d}-of-{num_shards:05d}.json" instead for json files
filenames = (f"{out_path}/data/train-{index:05d}-of-{num_shards:05d}.parquet" for index in range(num_shards))

with Pool(16) as p:
list(tqdm(p.imap_unordered(save_shard, zip(filenames, shards), chunksize=4), total=num_shards))
print(f"Time to save dataset: {time.time()-t_start:.2f}")
# to push dataset to hub do: git add/commit/push inside OUT_PATH
1 change: 1 addition & 0 deletions pii/ner/pii_redaction/replacements.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"EMAIL": ["<EMAIL>"], "KEY": ["<KEY>"], "NAME": ["<NAME>"], "PASSWORD": ["<PASSWORD>"], "IP_ADDRESS": {"IPv4": ["172.16.31.10", "172.16.58.3", "172.16.17.32", "192.168.127.12", "192.168.3.11"], "IPv6": ["fd00:c2b6:b24b:be67:2827:688d:e6a1:6a3b", "fd00:a516:7c1b:17cd:6d81:2137:bd2a:2c5b", "fc00:e968:6179::de52:7100", "fc00:db20:35b:7399::5", "fdf8:f53e:61e4::18"]}}
Loading

0 comments on commit c04356b

Please sign in to comment.