Skip to content

Commit

Permalink
Update tokenization script to upload to HF in chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
Siwei Li committed Apr 19, 2024
1 parent 90147dd commit 9b344e8
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 24 deletions.
43 changes: 31 additions & 12 deletions scripts/demo_upload_in_chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

import argparse
import io
import random
import math
from typing import cast

from datasets import Dataset
from datasets import Dataset, DatasetDict, load_dataset
from huggingface_hub import HfApi
from transformers import AutoTokenizer

from delphi.dataset.tokenization import tokenize_dataset

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="")
Expand All @@ -23,30 +27,45 @@
help="Hugging Face API token",
)
args = parser.parse_args()

splits = ["train", "validation"]

api = HfApi(token=args.hf_token)
api.create_repo(repo_id=args.output_dataset, repo_type="dataset")
# api.create_repo(repo_id=args.output_dataset, repo_type="dataset")

N_CHUNKS = 3
CHUNK_SIZE = 5
n_chunks = 0
CHUNK_SIZE = 200000
input_dataset = load_dataset("delphi-suite/stories")
input_dataset = cast(DatasetDict, input_dataset)
splits = list(input_dataset.keys())
for i, split in enumerate(splits):
for chunk in range(N_CHUNKS):
tokenized_dataset = tokenize_dataset(
input_dataset[split]["story"],
AutoTokenizer.from_pretrained("delphi-suite/stories-tokenizer"),
context_size=512,
batch_size=50,
)[:300001]
print(
f"Dataset split tokenization finished, length of dataset split: {len(tokenized_dataset)}. Starting to upload chunks to HF..."
)

n_chunks = math.ceil(len(tokenized_dataset) / CHUNK_SIZE)
for chunk_idx in range(n_chunks):
ds_chunk = Dataset.from_dict(
{
"tokens": [
[chunk] + random.sample(range(10), 5) for _ in range(CHUNK_SIZE)
"tokens": tokenized_dataset[
chunk_idx * CHUNK_SIZE : (chunk_idx + 1) * CHUNK_SIZE
]
}
)

ds_parquet_chunk = io.BytesIO()
ds_chunk.to_parquet(ds_parquet_chunk)
api.upload_file(
path_or_fileobj=ds_parquet_chunk,
path_in_repo=f"data/{split}-{chunk:05}-of-{N_CHUNKS:05}.parquet",
path_in_repo=f"data/{split}-{chunk_idx+1:05}-of-{n_chunks:05}.parquet",
repo_id=args.output_dataset,
repo_type="dataset",
)
print(
f"Chunk {split}-{chunk_idx+1:05}-of-{n_chunks:05} uploaded to HuggingFace."
)

print("Done.", flush=True)
48 changes: 36 additions & 12 deletions scripts/tokenize_dataset.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#!/usr/bin/env python3

import argparse
import io
import math
from typing import cast

from datasets import Dataset, DatasetDict, load_dataset
from huggingface_hub import HfApi
from transformers import AutoTokenizer

from delphi.dataset.tokenization import tokenize_dataset
Expand Down Expand Up @@ -48,9 +51,17 @@
"--batch-size",
type=int,
default=50,
help="Batch size of text inputs into the tokenizer",
help="Size of input into batched tokenization",
)
parser.add_argument(
"--chunk-size",
type=int,
default=200000,
help="Size of the chunked datasets to upload to HuggingFace",
)
args = parser.parse_args()
api = HfApi(token=args.hf_token)
# api.create_repo(repo_id=args.output_dataset, repo_type="dataset")

print(f"Loading dataset '{args.input_dataset}'...")
input_dataset = load_dataset(args.input_dataset)
Expand All @@ -61,9 +72,9 @@
assert tokenizer.eos_token_id is not None, "Tokenizer must have a eos_token_id"

splits = list(input_dataset.keys())
tokenized_datasets = {} # dict that will hold tokenized vers. of each dataset split
print(f"{splits=}")

CHUNK_SIZE = args.chunk_size
for i, split in enumerate(splits):
text_docs = input_dataset[split]
assert (
Expand All @@ -77,18 +88,31 @@
context_size=args.context_size,
batch_size=args.batch_size,
)
# Store the tokenized data in a new dataset for this split
tokenized_datasets[split] = Dataset.from_dict({"tokens": tokenized_dataset})
print(
f"Dataset {split} split tokenization finished, length of {split} split: {len(tokenized_dataset)}. Starting to upload chunks to HF..."
)

# Create a new dataset with the same structure (splits) as the original dataset, but with tokenized data
output_dataset = DatasetDict(tokenized_datasets)
n_chunks = math.ceil(len(tokenized_dataset) / CHUNK_SIZE)
for chunk_idx in range(n_chunks):
ds_chunk = Dataset.from_dict(
{
"tokens": tokenized_dataset[
chunk_idx * CHUNK_SIZE : (chunk_idx + 1) * CHUNK_SIZE
]
}
)

print("Tokenizaton completed. Uploading dataset to Huggingface.")
ds_parquet_chunk = io.BytesIO()
ds_chunk.to_parquet(ds_parquet_chunk)
api.upload_file(
path_or_fileobj=ds_parquet_chunk,
path_in_repo=f"data/{split}-{chunk_idx+1:05}-of-{n_chunks:05}.parquet",
repo_id=args.output_dataset,
repo_type="dataset",
)

output_dataset.push_to_hub(
repo_id=args.output_dataset,
private=False,
token=args.hf_token,
)
print(
f"Chunk {split}-{chunk_idx+1:05}-of-{n_chunks:05} uploaded to HuggingFace."
)

print("Done.", flush=True)
2 changes: 2 additions & 0 deletions src/delphi/dataset/tokenization.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def extend_deque(
text_documents: List of (untokenized) text documents to be tokenized.
doc_idx: Index of the current text story.
tokenizer: Tokenizer to encode the text strings.
batch_size: The size of input into batched tokenization.
Returns:
int: Updated index in the text documents dataset.
"""
Expand Down Expand Up @@ -88,6 +89,7 @@ def tokenize_dataset(
text_documents: List[str],
tokenizer,
context_size,
batch_size: The size of input into batched tokenization.
Returns:
list[list[int]]: List of token sequences of length equal to context_size.
Expand Down

0 comments on commit 9b344e8

Please sign in to comment.