Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataset tokenization script improvements #106

Merged
merged 12 commits into from
Apr 24, 2024
105 changes: 63 additions & 42 deletions scripts/tokenize_dataset.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,100 @@
#!/usr/bin/env python3

import argparse

from datasets import Dataset
from datasets import Dataset, Features, Value, load_dataset
from huggingface_hub import HfApi
from transformers import AutoTokenizer

from delphi.dataset.tokenization import tokenize_dataset
from delphi.eval.utils import load_validation_dataset
from delphi.dataset.tokenization import tokenize_and_upload_split

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="")
parser = argparse.ArgumentParser(description="", allow_abbrev=False)

parser.add_argument(
"--input-dataset-name",
"--in-repo-id",
"-i",
type=str,
required=True,
help="Text dataset from huggingface to tokenize",
)
parser.add_argument(
"--output-dataset-name",
"--feature",
"-f",
type=str,
help="Name of the tokenized dataset to upload to huggingface",
required=True,
help="Name of the column containing text documents in the input dataset",
)
parser.add_argument(
"--tokenizer-name",
"--split",
"-s",
type=str,
help="Name of the tokenizer from huggingface",
required=True,
help="Split of the dataset to be tokenized, supports slicing like 'train[:10%%]'",
)
parser.add_argument(
"--token",
"--out-repo-id",
"-o",
type=str,
help="Hugging Face API token",
required=True,
help="Name of the tokenized dataset to upload to huggingface",
)
parser.add_argument(
"--context-size",
"--tokenizer",
"-r",
type=str,
required=True,
help="Name of the tokenizer from huggingface",
)
parser.add_argument(
"--seq-len",
"-l",
type=int,
default=512,
required=True,
help="Context size of the tokenized dataset as input of the model",
)
parser.add_argument(
"--hf-token",
"-t",
type=str,
help="Hugging Face API token",
)
parser.add_argument(
"--batch-size",
"-b",
type=int,
default=50,
help="Batch size of text inputs into the tokenizer",
help="Size of input into batched tokenization",
)
parser.add_argument(
"--column-name",
type=str,
help="Name of the column containing text documents in the input dataset",
"--chunk-size",
"-c",
type=int,
default=200_000,
help="Size of the parquet chunks uploaded to HuggingFace",
)
args = parser.parse_args()

input_dataset = load_validation_dataset(args.input_dataset_name)
tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_name)

if args.column_name:
text_docs = input_dataset[args.column_name]
else:
if len(input_dataset.column_names) > 1:
raise ValueError("There is more than one column in the specified dataset")
text_docs = input_dataset[input_dataset.column_names[0]]

tokenized_dataset = tokenize_dataset(
text_docs,
tokenizer,
context_size=args.context_size,
batch_size=args.batch_size,
)
output_dataset = Dataset.from_dict(
{
"tokens": tokenized_dataset,
}
print(f"Loading dataset '{args.in_repo_id}'...")
in_dataset_split = load_dataset(
args.in_repo_id,
split=args.split,
features=Features({args.feature: Value("string")}),
)
assert isinstance(in_dataset_split, Dataset)
print(f"Loading tokenizer '{args.tokenizer}'...")
tokenizer = AutoTokenizer.from_pretrained(args.tokenizer)
assert tokenizer.bos_token_id is not None, "Tokenizer must have a bos_token_id"
assert tokenizer.eos_token_id is not None, "Tokenizer must have a eos_token_id"

output_dataset.push_to_hub(
repo_id=args.output_dataset_name,
private=False,
token=args.token,
api = HfApi(token=args.hf_token)
api.create_repo(repo_id=args.out_repo_id, repo_type="dataset", exist_ok=True)
tokenize_and_upload_split(
dataset_split=in_dataset_split,
split_name=args.split.split("[")[0],
tokenizer=tokenizer,
seq_len=args.seq_len,
batch_size=args.batch_size,
chunk_size=args.chunk_size,
out_repo_id=args.out_repo_id,
api=api,
)
125 changes: 81 additions & 44 deletions src/delphi/dataset/tokenization.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import io
from collections import deque
from typing import Optional
from collections.abc import Generator

from datasets import Dataset
from huggingface_hub import HfApi
from tqdm.auto import trange
from transformers import PreTrainedTokenizerBase


def extend_deque(
dq: deque[int],
deq: deque[int],
context_size: int,
text_documents: list[str],
dataset: Dataset,
doc_idx: int,
tokenizer: PreTrainedTokenizerBase,
batch_size: int,
Expand All @@ -25,61 +29,54 @@ def extend_deque(
text_documents: List of (untokenized) text documents to be tokenized.
doc_idx: Index of the current text story.
tokenizer: Tokenizer to encode the text strings.
batch_size: The size of input into batched tokenization.
Returns:
int: Updated index in the text documents dataset.
"""
while len(dq) < context_size and doc_idx < len(text_documents):
text_doc = text_documents[doc_idx : doc_idx + batch_size]
feature = dataset.column_names[0]
while len(deq) < context_size and doc_idx < len(dataset):
documents = dataset[doc_idx : doc_idx + batch_size][feature]
batch_input_ids = tokenizer(
text_doc, return_attention_mask=False, add_special_tokens=False
documents, return_attention_mask=False, add_special_tokens=False
)["input_ids"]
for input_ids in batch_input_ids:
dq.extend(input_ids + [tokenizer.eos_token_id])
for input_ids in batch_input_ids: # type: ignore
deq.extend(input_ids + [tokenizer.eos_token_id])
doc_idx += batch_size
return doc_idx


def make_new_samples(
dq: deque[int], context_size: int, bos_token_id: int
) -> list[list[int]]:
def make_new_sample(deq: deque[int], context_size: int, bos_token_id: int) -> list[int]:
"""
Generates new samples for training by creating sequences of tokens
from the deque until the deque does not hold enough tokens to generate
another sample.
Generates new sample for training by creating sequence of tokens
from the deque until the deque.

Note: the model is unable to use the last token in an input sequence,
so we repeat this token in the next input sequence.

Args:
dq: Deque containing tokenized tokens.
deq: Deque containing tokenized tokens.
context_size: Size of the context (input sequences).
bos_token_id: bos_token_id of the tokenizer used.

Returns:
list[list[int]]: List of token sequences of the same length(context_size).
list[int]: token sequence.
"""

samples = []
while len(dq) >= context_size:
sample = [bos_token_id]

# For the first (n-1) elements, pop from the left of the deque
# and add to the new sample, the n-th element will be retained
# in the deque for making the next sample.
for _ in range(context_size - 1):
sample.append(dq.popleft())
sample.append(dq[0])

samples.append(sample)
return samples
sample = [bos_token_id]
# For the first (n-1) elements, pop from the left of the deque
# and add to the new sample, the n-th element will be retained
# in the deque for making the next sample.
for _ in range(context_size - 1):
sample.append(deq.popleft())
sample.append(deq[0])
return sample


def tokenize_dataset(
text_documents: list[str],
dataset: Dataset,
tokenizer: PreTrainedTokenizerBase,
context_size: int,
seq_len: int,
batch_size: int,
) -> list[list[int]]:
) -> Generator[list[int], None, None]:
"""
Tokenizes the input text documents using the provided tokenizer and
generates token sequences of the specified length.
Expand All @@ -88,20 +85,60 @@ def tokenize_dataset(
text_documents: List[str],
tokenizer,
context_size,
batch_size: The size of input into batched tokenization.

Returns:
list[list[int]]: List of token sequences of length equal to context_size.
oken sequences of length equal to context_size.
"""

dq = deque()
assert tokenizer.bos_token_id is not None
deq = deque()
doc_idx = 0
samples = []
# iterate through the text documents and tokenize them
while doc_idx < len(dataset):
doc_idx = extend_deque(deq, seq_len, dataset, doc_idx, tokenizer, batch_size)
yield make_new_sample(deq, seq_len, tokenizer.bos_token_id)
# We discard the last chunk, so no processing on the remainder of the deque here

while doc_idx < len(text_documents):
doc_idx = extend_deque(
dq, context_size, text_documents, doc_idx, tokenizer, batch_size
)
samples.extend(make_new_samples(dq, context_size, tokenizer.bos_token_id))

# We discard the last chunk, so no processing on the remainder of the deque here
return samples
def tokenize_and_upload_split(
dataset_split: Dataset,
split_name: str,
tokenizer: PreTrainedTokenizerBase,
seq_len: int,
batch_size: int,
chunk_size: int,
out_repo_id: str,
api: HfApi,
):
seq_gen = tokenize_dataset(
dataset_split,
tokenizer,
seq_len=seq_len,
batch_size=batch_size,
)
seq_it = iter(seq_gen)
print(f"Tokenizing {split_name=}...")
chunk_idx = 0
done = False
while not done:
tokens = []
print(f"Processing chunk {chunk_idx}...")
for _ in trange(chunk_size):
try:
tokens.append(next(seq_it))
except StopIteration:
done = True
break
ds_chunk = Dataset.from_dict({"tokens": tokens})
ds_parquet_chunk = io.BytesIO()
ds_chunk.to_parquet(ds_parquet_chunk)
chunk_name = f"{split_name}-{chunk_idx:05}.parquet"
print(f"Uploading {chunk_name}...")
api.upload_file(
path_or_fileobj=ds_parquet_chunk,
path_in_repo=f"data/{chunk_name}",
repo_id=out_repo_id,
repo_type="dataset",
)
chunk_idx += 1
print("Done.")
Loading
Loading