Skip to content

Commit

Permalink
feat: add edsnlp.data support for parquet files with parallel reading…
Browse files Browse the repository at this point in the history
… / writing
  • Loading branch information
percevalw committed Dec 4, 2023
1 parent c9c545c commit 3ec32ab
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions edsnlp/data/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ def __init__(
if isinstance(path, Path) or "://" in path
else f"file://{os.path.abspath(path)}"
)
fs, path = pyarrow.fs.FileSystem.from_uri(path)
fs, fs_path = pyarrow.fs.FileSystem.from_uri(path)
fs: pyarrow.fs.FileSystem
fs.create_dir(path, recursive=True)
fs.create_dir(fs_path, recursive=True)
if overwrite is False:
dataset = pyarrow.dataset.dataset(path, format="parquet", filesystem=fs)
dataset = pyarrow.dataset.dataset(fs_path, format="parquet", filesystem=fs)
if len(list(dataset.get_fragments())):
raise FileExistsError(
f"Directory {path} already exists and is not empty. "
Expand All @@ -104,7 +104,6 @@ def __init__(
def write_worker(self, records, last=False):
# Results will contain a batches of samples ready to be written (or None if
# write_in_worker is True) and they have already been written.
n_to_fill = self.num_rows_per_file - len(self.batch)
results = []
count = 0

Expand All @@ -115,9 +114,10 @@ def write_worker(self, records, last=False):
# While there is something to write
greedy = last or not self.accumulate
while len(records) or greedy and len(self.batch):
n_to_fill = self.num_rows_per_file - len(self.batch)
self.batch.extend(records[:n_to_fill])
records = records[n_to_fill:]
if greedy or len(self.batch) == self.num_rows_per_file:
if greedy or len(self.batch) >= self.num_rows_per_file:
fragment = pyarrow.Table.from_pydict(ld_to_dl(self.batch)) # type: ignore
count += len(self.batch)
self.batch = []
Expand Down

0 comments on commit 3ec32ab

Please sign in to comment.