diff --git a/fineweb.py b/fineweb.py index 493c463..dc72f21 100644 --- a/fineweb.py +++ b/fineweb.py @@ -41,42 +41,43 @@ def tokenize(doc): def write_datafile(filename, tokens_np): np.save(filename, tokens_np) -# tokenize all documents and write output shards, each of shard_size tokens (last shard has remainder) -nprocs = max(1, os.cpu_count()//2) -with mp.Pool(nprocs) as pool: - shard_index = 0 - # preallocate buffer to hold current shard - all_tokens_np = np.empty((shard_size,), dtype=np.uint16) - token_count = 0 - progress_bar = None - for tokens in pool.imap(tokenize, fw, chunksize=16): +if __name__ == '__main__': + # tokenize all documents and write output shards, each of shard_size tokens (last shard has remainder) + nprocs = max(1, os.cpu_count()//2) + with mp.Pool(nprocs) as pool: + shard_index = 0 + # preallocate buffer to hold current shard + all_tokens_np = np.empty((shard_size,), dtype=np.uint16) + token_count = 0 + progress_bar = None + for tokens in pool.imap(tokenize, fw, chunksize=16): - # is there enough space in the current shard for the new tokens? - if token_count + len(tokens) < shard_size: - # simply append tokens to current shard - all_tokens_np[token_count:token_count+len(tokens)] = tokens - token_count += len(tokens) - # update progress bar - if progress_bar is None: - progress_bar = tqdm(total=shard_size, unit="tokens", desc=f"Shard {shard_index}") - progress_bar.update(len(tokens)) - else: - # write the current shard and start a new one + # is there enough space in the current shard for the new tokens? + if token_count + len(tokens) < shard_size: + # simply append tokens to current shard + all_tokens_np[token_count:token_count+len(tokens)] = tokens + token_count += len(tokens) + # update progress bar + if progress_bar is None: + progress_bar = tqdm(total=shard_size, unit="tokens", desc=f"Shard {shard_index}") + progress_bar.update(len(tokens)) + else: + # write the current shard and start a new one + split = "val" if shard_index == 0 else "train" + filename = os.path.join(DATA_CACHE_DIR, f"edufineweb_{split}_{shard_index:06d}") + # split the document into whatever fits in this shard; the remainder goes to next one + remainder = shard_size - token_count + progress_bar.update(remainder) + all_tokens_np[token_count:token_count+remainder] = tokens[:remainder] + write_datafile(filename, all_tokens_np) + shard_index += 1 + progress_bar = None + # populate the next shard with the leftovers of the current doc + all_tokens_np[0:len(tokens)-remainder] = tokens[remainder:] + token_count = len(tokens)-remainder + + # write any remaining tokens as the last shard + if token_count != 0: split = "val" if shard_index == 0 else "train" filename = os.path.join(DATA_CACHE_DIR, f"edufineweb_{split}_{shard_index:06d}") - # split the document into whatever fits in this shard; the remainder goes to next one - remainder = shard_size - token_count - progress_bar.update(remainder) - all_tokens_np[token_count:token_count+remainder] = tokens[:remainder] - write_datafile(filename, all_tokens_np) - shard_index += 1 - progress_bar = None - # populate the next shard with the leftovers of the current doc - all_tokens_np[0:len(tokens)-remainder] = tokens[remainder:] - token_count = len(tokens)-remainder - - # write any remaining tokens as the last shard - if token_count != 0: - split = "val" if shard_index == 0 else "train" - filename = os.path.join(DATA_CACHE_DIR, f"edufineweb_{split}_{shard_index:06d}") - write_datafile(filename, all_tokens_np[:token_count]) + write_datafile(filename, all_tokens_np[:token_count])