diff --git a/scripts/tokenize_dataset.py b/scripts/tokenize_dataset.py index de46a8d5..303a8ff9 100755 --- a/scripts/tokenize_dataset.py +++ b/scripts/tokenize_dataset.py @@ -1,79 +1,100 @@ #!/usr/bin/env python3 - import argparse -from datasets import Dataset +from datasets import Dataset, Features, Value, load_dataset +from huggingface_hub import HfApi from transformers import AutoTokenizer -from delphi.dataset.tokenization import tokenize_dataset -from delphi.eval.utils import load_validation_dataset +from delphi.dataset.tokenization import tokenize_and_upload_split if __name__ == "__main__": - parser = argparse.ArgumentParser(description="") + parser = argparse.ArgumentParser(description="", allow_abbrev=False) parser.add_argument( - "--input-dataset-name", + "--in-repo-id", + "-i", type=str, + required=True, help="Text dataset from huggingface to tokenize", ) parser.add_argument( - "--output-dataset-name", + "--feature", + "-f", type=str, - help="Name of the tokenized dataset to upload to huggingface", + required=True, + help="Name of the column containing text documents in the input dataset", ) parser.add_argument( - "--tokenizer-name", + "--split", + "-s", type=str, - help="Name of the tokenizer from huggingface", + required=True, + help="Split of the dataset to be tokenized, supports slicing like 'train[:10%%]'", ) parser.add_argument( - "--token", + "--out-repo-id", + "-o", type=str, - help="Hugging Face API token", + required=True, + help="Name of the tokenized dataset to upload to huggingface", ) parser.add_argument( - "--context-size", + "--tokenizer", + "-r", + type=str, + required=True, + help="Name of the tokenizer from huggingface", + ) + parser.add_argument( + "--seq-len", + "-l", type=int, - default=512, + required=True, help="Context size of the tokenized dataset as input of the model", ) + parser.add_argument( + "--hf-token", + "-t", + type=str, + help="Hugging Face API token", + ) parser.add_argument( "--batch-size", + "-b", type=int, default=50, - help="Batch size of text inputs into the tokenizer", + help="Size of input into batched tokenization", ) parser.add_argument( - "--column-name", - type=str, - help="Name of the column containing text documents in the input dataset", + "--chunk-size", + "-c", + type=int, + default=200_000, + help="Size of the parquet chunks uploaded to HuggingFace", ) args = parser.parse_args() - input_dataset = load_validation_dataset(args.input_dataset_name) - tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_name) - - if args.column_name: - text_docs = input_dataset[args.column_name] - else: - if len(input_dataset.column_names) > 1: - raise ValueError("There is more than one column in the specified dataset") - text_docs = input_dataset[input_dataset.column_names[0]] - - tokenized_dataset = tokenize_dataset( - text_docs, - tokenizer, - context_size=args.context_size, - batch_size=args.batch_size, - ) - output_dataset = Dataset.from_dict( - { - "tokens": tokenized_dataset, - } + print(f"Loading dataset '{args.in_repo_id}'...") + in_dataset_split = load_dataset( + args.in_repo_id, + split=args.split, + features=Features({args.feature: Value("string")}), ) + assert isinstance(in_dataset_split, Dataset) + print(f"Loading tokenizer '{args.tokenizer}'...") + tokenizer = AutoTokenizer.from_pretrained(args.tokenizer) + assert tokenizer.bos_token_id is not None, "Tokenizer must have a bos_token_id" + assert tokenizer.eos_token_id is not None, "Tokenizer must have a eos_token_id" - output_dataset.push_to_hub( - repo_id=args.output_dataset_name, - private=False, - token=args.token, + api = HfApi(token=args.hf_token) + api.create_repo(repo_id=args.out_repo_id, repo_type="dataset", exist_ok=True) + tokenize_and_upload_split( + dataset_split=in_dataset_split, + split_name=args.split.split("[")[0], + tokenizer=tokenizer, + seq_len=args.seq_len, + batch_size=args.batch_size, + chunk_size=args.chunk_size, + out_repo_id=args.out_repo_id, + api=api, ) diff --git a/src/delphi/dataset/tokenization.py b/src/delphi/dataset/tokenization.py index b800b64b..9447a4d8 100644 --- a/src/delphi/dataset/tokenization.py +++ b/src/delphi/dataset/tokenization.py @@ -1,13 +1,17 @@ +import io from collections import deque -from typing import Optional +from collections.abc import Generator +from datasets import Dataset +from huggingface_hub import HfApi +from tqdm.auto import trange from transformers import PreTrainedTokenizerBase def extend_deque( - dq: deque[int], + deq: deque[int], context_size: int, - text_documents: list[str], + dataset: Dataset, doc_idx: int, tokenizer: PreTrainedTokenizerBase, batch_size: int, @@ -25,61 +29,54 @@ def extend_deque( text_documents: List of (untokenized) text documents to be tokenized. doc_idx: Index of the current text story. tokenizer: Tokenizer to encode the text strings. + batch_size: The size of input into batched tokenization. Returns: int: Updated index in the text documents dataset. """ - while len(dq) < context_size and doc_idx < len(text_documents): - text_doc = text_documents[doc_idx : doc_idx + batch_size] + feature = dataset.column_names[0] + while len(deq) < context_size and doc_idx < len(dataset): + documents = dataset[doc_idx : doc_idx + batch_size][feature] batch_input_ids = tokenizer( - text_doc, return_attention_mask=False, add_special_tokens=False + documents, return_attention_mask=False, add_special_tokens=False )["input_ids"] - for input_ids in batch_input_ids: - dq.extend(input_ids + [tokenizer.eos_token_id]) + for input_ids in batch_input_ids: # type: ignore + deq.extend(input_ids + [tokenizer.eos_token_id]) doc_idx += batch_size return doc_idx -def make_new_samples( - dq: deque[int], context_size: int, bos_token_id: int -) -> list[list[int]]: +def make_new_sample(deq: deque[int], context_size: int, bos_token_id: int) -> list[int]: """ - Generates new samples for training by creating sequences of tokens - from the deque until the deque does not hold enough tokens to generate - another sample. + Generates new sample for training by creating sequence of tokens + from the deque until the deque. Note: the model is unable to use the last token in an input sequence, so we repeat this token in the next input sequence. Args: - dq: Deque containing tokenized tokens. + deq: Deque containing tokenized tokens. context_size: Size of the context (input sequences). bos_token_id: bos_token_id of the tokenizer used. Returns: - list[list[int]]: List of token sequences of the same length(context_size). + list[int]: token sequence. """ - - samples = [] - while len(dq) >= context_size: - sample = [bos_token_id] - - # For the first (n-1) elements, pop from the left of the deque - # and add to the new sample, the n-th element will be retained - # in the deque for making the next sample. - for _ in range(context_size - 1): - sample.append(dq.popleft()) - sample.append(dq[0]) - - samples.append(sample) - return samples + sample = [bos_token_id] + # For the first (n-1) elements, pop from the left of the deque + # and add to the new sample, the n-th element will be retained + # in the deque for making the next sample. + for _ in range(context_size - 1): + sample.append(deq.popleft()) + sample.append(deq[0]) + return sample def tokenize_dataset( - text_documents: list[str], + dataset: Dataset, tokenizer: PreTrainedTokenizerBase, - context_size: int, + seq_len: int, batch_size: int, -) -> list[list[int]]: +) -> Generator[list[int], None, None]: """ Tokenizes the input text documents using the provided tokenizer and generates token sequences of the specified length. @@ -88,20 +85,60 @@ def tokenize_dataset( text_documents: List[str], tokenizer, context_size, + batch_size: The size of input into batched tokenization. Returns: - list[list[int]]: List of token sequences of length equal to context_size. + oken sequences of length equal to context_size. """ - - dq = deque() + assert tokenizer.bos_token_id is not None + deq = deque() doc_idx = 0 - samples = [] + # iterate through the text documents and tokenize them + while doc_idx < len(dataset): + doc_idx = extend_deque(deq, seq_len, dataset, doc_idx, tokenizer, batch_size) + yield make_new_sample(deq, seq_len, tokenizer.bos_token_id) + # We discard the last chunk, so no processing on the remainder of the deque here - while doc_idx < len(text_documents): - doc_idx = extend_deque( - dq, context_size, text_documents, doc_idx, tokenizer, batch_size - ) - samples.extend(make_new_samples(dq, context_size, tokenizer.bos_token_id)) - # We discard the last chunk, so no processing on the remainder of the deque here - return samples +def tokenize_and_upload_split( + dataset_split: Dataset, + split_name: str, + tokenizer: PreTrainedTokenizerBase, + seq_len: int, + batch_size: int, + chunk_size: int, + out_repo_id: str, + api: HfApi, +): + seq_gen = tokenize_dataset( + dataset_split, + tokenizer, + seq_len=seq_len, + batch_size=batch_size, + ) + seq_it = iter(seq_gen) + print(f"Tokenizing {split_name=}...") + chunk_idx = 0 + done = False + while not done: + tokens = [] + print(f"Processing chunk {chunk_idx}...") + for _ in trange(chunk_size): + try: + tokens.append(next(seq_it)) + except StopIteration: + done = True + break + ds_chunk = Dataset.from_dict({"tokens": tokens}) + ds_parquet_chunk = io.BytesIO() + ds_chunk.to_parquet(ds_parquet_chunk) + chunk_name = f"{split_name}-{chunk_idx:05}.parquet" + print(f"Uploading {chunk_name}...") + api.upload_file( + path_or_fileobj=ds_parquet_chunk, + path_in_repo=f"data/{chunk_name}", + repo_id=out_repo_id, + repo_type="dataset", + ) + chunk_idx += 1 + print("Done.") diff --git a/tests/dataset/test_tokenizer.py b/tests/dataset/test_tokeniation.py similarity index 55% rename from tests/dataset/test_tokenizer.py rename to tests/dataset/test_tokeniation.py index 99b2dcb3..4d765fbc 100644 --- a/tests/dataset/test_tokenizer.py +++ b/tests/dataset/test_tokeniation.py @@ -2,9 +2,10 @@ import random import pytest +from datasets import Dataset from transformers import AutoTokenizer -from delphi.dataset.tokenization import extend_deque, make_new_samples, tokenize_dataset +from delphi.dataset.tokenization import extend_deque, make_new_sample, tokenize_dataset @pytest.fixture @@ -12,32 +13,38 @@ def tokenizer(): return AutoTokenizer.from_pretrained("delphi-suite/stories-tokenizer") +def make_random_document(tokenizer): + all_token_ids = range(2, tokenizer.vocab_size) + n_tokens = random.randint(100, 800) + random_tokens = random.choices(all_token_ids, k=n_tokens) + return tokenizer.decode(random_tokens) + + +def get_random_feature_name(): + return "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=10)) + + def test_extend_deque(tokenizer): CTX_SIZE = 10 BATCH_SIZE = 2 # generate 100 random stories - text_stories = [ - " ".join( - [ - tokenizer.decode(random.randint(3, tokenizer.vocab_size)) - for _ in range(random.randint(100, 800)) - ] - ) - for _ in range(100) - ] + documents = [make_random_document(tokenizer) for _ in range(100)] + feature_name = get_random_feature_name() + dataset = Dataset.from_dict({feature_name: documents}) + prompt_idx = 0 - dq = collections.deque() + deq = collections.deque() - while prompt_idx < len(text_stories): + while prompt_idx < len(dataset): prompt_idx = extend_deque( - dq, CTX_SIZE, text_stories, prompt_idx, tokenizer, BATCH_SIZE + deq, CTX_SIZE, dataset, prompt_idx, tokenizer, BATCH_SIZE ) - if prompt_idx < len(text_stories) - 1: + if prompt_idx < len(dataset) - 1: # assert that the deque has grown large enough in each round - assert len(dq) >= CTX_SIZE - while len(dq) >= CTX_SIZE: + assert len(deq) >= CTX_SIZE + while len(deq) >= CTX_SIZE: for _ in range(CTX_SIZE - 1): - dq.popleft() + deq.popleft() def test_make_new_sample(tokenizer): @@ -45,7 +52,9 @@ def test_make_new_sample(tokenizer): total_tokens = random.randint(100, 1000) context_size = random.randint(5, total_tokens // 2) dq = collections.deque(random.choices(range(3, 1000), k=total_tokens)) - samples = make_new_samples(dq, context_size, tokenizer.bos_token_id) + samples = [] + while len(dq) >= context_size: + samples.append(make_new_sample(dq, context_size, tokenizer.bos_token_id)) tokens_cnt = 0 for i, sample in enumerate(samples): assert sample[0] == tokenizer.bos_token_id @@ -67,22 +76,21 @@ def test_tokenize_dataset(tokenizer): CTX_SIZE = 10 BATCH_SIZE = 2 - text_stories = [ + documents = [ "Once upon a", "Mother woke up alert. She put on her coat", "Once upon a time, in a small town, there was a weird", "Once upon a time, there was a", "Sara and Tom are friends. They like to play in the park.", ] - correct_batches = [ - [1, 432, 440, 261, 2, 367, 501, 1917, 372, 3398, 4037], - [1, 4037, 341, 577, 359, 342, 1854, 2, 432, 440, 261], - [1, 261, 403, 4045, 317, 261, 560, 1000, 4045, 406, 286], - [1, 286, 261, 2567, 2, 432, 440, 261, 403, 4045, 406], - [1, 406, 286, 261, 2, 787, 269, 396, 484, 415, 4037], - [1, 4037, 311, 519, 268, 326, 317, 264, 525, 4037, 2], + feature_name = get_random_feature_name() + dataset = Dataset.from_dict({feature_name: documents}) + expected = [ + [0, 431, 440, 260, 1, 46, 499, 1945, 368, 3443, 15], + [0, 15, 340, 576, 355, 337, 1887, 1, 431, 440, 260], + [0, 260, 399, 13, 314, 260, 560, 1005, 13, 402, 284], + [0, 284, 260, 2606, 1, 431, 440, 260, 399, 13, 402], + [0, 402, 284, 260, 1, 1370, 268, 415, 484, 412, 15], ] - assert ( - tokenize_dataset(text_stories, tokenizer, CTX_SIZE, BATCH_SIZE) - == correct_batches - ) + actual = [x for x in tokenize_dataset(dataset, tokenizer, CTX_SIZE, BATCH_SIZE)] + assert actual == expected