Skip to content

Commit

Permalink
fix: allow non-url paths when parquet filesystem is given
Browse files Browse the repository at this point in the history
  • Loading branch information
percevalw committed Jan 29, 2024
1 parent 8a5f36e commit f82221e
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 126 deletions.
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### Fixed

Allow non-url paths when parquet filesystem is given

## v0.10.4

### Changed
Expand Down
158 changes: 32 additions & 126 deletions edsnlp/data/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ def __init__(
filesystem: Optional[pyarrow.fs.FileSystem] = None,
):
super().__init__()
path = (
path
if isinstance(path, Path) or "://" in path
else f"file://{os.path.abspath(path)}"
)
inferred_fs, fs_path = pyarrow.fs.FileSystem.from_uri(path)
filesystem = filesystem or inferred_fs
assert inferred_fs.type_name == filesystem.type_name, (
f"Protocol {inferred_fs.type_name} in path does not match "
f"filesystem {filesystem.type_name}"
)
# Either the filesystem has not been passed
# or the path is a URL (e.g. s3://) => we need to infer the filesystem
fs_path = path
if filesystem is None or (isinstance(path, str) and "://" in path):
path = (
path
if isinstance(path, Path) or "://" in path
else f"file://{os.path.abspath(path)}"
)
inferred_fs, fs_path = pyarrow.fs.FileSystem.from_uri(path)
filesystem = filesystem or inferred_fs
assert inferred_fs.type_name == filesystem.type_name, (
f"Protocol {inferred_fs.type_name} in path does not match "
f"filesystem {filesystem.type_name}"
)
self.read_in_worker = read_in_worker
self.dataset = pyarrow.dataset.dataset(
fs_path, format="parquet", filesystem=filesystem
Expand Down Expand Up @@ -86,17 +90,20 @@ def __init__(
filesystem: Optional[pyarrow.fs.FileSystem] = None,
):
super().__init__()
path = (
path
if isinstance(path, Path) or "://" in path
else f"file://{os.path.abspath(path)}"
)
inferred_fs, fs_path = pyarrow.fs.FileSystem.from_uri(path)
filesystem = filesystem or inferred_fs
assert inferred_fs.type_name == filesystem.type_name, (
f"Protocol {inferred_fs.type_name} in path does not match "
f"filesystem {filesystem.type_name}"
)
fs_path = path
if filesystem is None or (isinstance(path, str) and "://" in path):
path = (
path
if isinstance(path, Path) or "://" in path
else f"file://{os.path.abspath(path)}"
)
inferred_fs, fs_path = pyarrow.fs.FileSystem.from_uri(path)
filesystem = filesystem or inferred_fs
assert inferred_fs.type_name == filesystem.type_name, (
f"Protocol {inferred_fs.type_name} in path does not match "
f"filesystem {filesystem.type_name}"
)
path = fs_path
# Check that filesystem has the same protocol as indicated by path
filesystem.create_dir(fs_path, recursive=True)
if overwrite is False:
Expand Down Expand Up @@ -162,7 +169,9 @@ def write_main(self, fragments: Iterable[List[Union[pyarrow.Table, Path]]]):
root_path=self.path,
filesystem=self.filesystem,
)
return pyarrow.dataset.dataset(self.path)
return pyarrow.dataset.dataset(
self.path, format="parquet", filesystem=self.filesystem
)


@registry.readers.register("parquet")
Expand All @@ -174,53 +183,6 @@ def read_parquet(
filesystem: Optional[pyarrow.fs.FileSystem] = None,
**kwargs,
) -> LazyCollection:
"""
The ParquetReader (or `edsnlp.data.read_parquet`) reads a directory of parquet files
(or a single file) and yields documents.
Example
-------
```{ .python .no-check }
import edsnlp
nlp = edsnlp.blank("eds")
nlp.add_pipe(...)
doc_iterator = edsnlp.data.read_parquet("path/to/parquet", converter="omop")
annotated_docs = nlp.pipe(doc_iterator)
```
!!! note "Generator vs list"
`edsnlp.data.read_parquet` returns a
[LazyCollection][edsnlp.core.lazy_collection.LazyCollection].
To iterate over the documents multiple times efficiently or to access them by
index, you must convert it to a list
```{ .python .no-check }
docs = list(edsnlp.data.read_parquet("path/to/parquet", converter="omop"))
```
Parameters
----------
path: Union[str, Path]
Path to the directory containing the parquet files (will recursively look for
files in subdirectories). Supports any filesystem supported by pyarrow.
converter: Optional[Union[str, Callable]]
Converter to use to convert the parquet rows of the data source to Doc objects
read_in_worker: bool
Whether to read the files in the worker or in the main process.
filesystem: Optional[pyarrow.fs.FileSystem]
The filesystem to use to read the files. If None, the filesystem will be
inferred from the path (e.g. `s3://` will use S3).
kwargs:
Additional keyword arguments to pass to the converter. These are documented
on the [Data schemas](/data/schemas) page.
Returns
-------
LazyCollection
"""
data = LazyCollection(
reader=ParquetReader(
path,
Expand All @@ -247,62 +209,6 @@ def write_parquet(
converter: Optional[Union[str, Callable]],
**kwargs,
) -> None:
"""
`edsnlp.data.write_parquet` writes a list of documents as a parquet dataset.
Example
-------
```{ .python .no-check }
import edsnlp
nlp = edsnlp.blank("eds")
nlp.add_pipe(...)
doc = nlp("My document with entities")
edsnlp.data.write_parquet([doc], "path/to/parquet")
```
!!! warning "Overwriting files"
By default, `write_parquet` will raise an error if the directory already exists
and contains parquet files. This is to avoid overwriting existing annotations.
To allow overwriting existing files, use `overwrite=True`.
Parameters
----------
data: Union[Any, LazyCollection],
The data to write (either a list of documents or a LazyCollection).
path: Union[str, Path]
Path to the directory containing the parquet files (will recursively look for
files in subdirectories). Supports any filesystem supported by pyarrow.
num_rows_per_file: int
The maximum number of documents to write in each parquet file.
overwrite: bool
Whether to overwrite existing directories.
write_in_worker: bool
Whether to write the files in the workers or in the main process.
accumulate: bool
Whether to accumulate the results sent to the writer by workers until the
batch is full or the writer is finalized. If False, each file will not be larger
than the size of the batches it receives. This option requires that the writer
is finalized before the end of the processing, which may not be compatible with
some backends, such as `spark`.
If `write_in_worker` is True, documents will be accumulated in each worker but
not across workers, therefore leading to a larger number of files.
converter: Optional[Union[str, Callable]]
Converter to use to convert the documents to dictionary objects before writing
them.
filesystem: Optional[pyarrow.fs.FileSystem]
The filesystem to use to write the files. If None, the filesystem will be
inferred from the path (e.g. `s3://` will use S3).
kwargs:
Additional keyword arguments to pass to the converter. These are documented
on the [Data schemas](/data/schemas) page.
"""

data = LazyCollection.ensure_lazy(data)
if converter:
converter, kwargs = get_doc2dict_converter(converter, kwargs)
Expand Down
3 changes: 3 additions & 0 deletions tests/data/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path

import pyarrow.dataset
import pyarrow.fs
import pytest

import edsnlp
Expand Down Expand Up @@ -213,12 +214,14 @@ def test_read_write_in_worker(blank_nlp, tmpdir):
def test_read_to_parquet(blank_nlp, tmpdir):
input_dir = Path(__file__).parent.parent.resolve() / "resources" / "docs.pq"
output_dir = Path(tmpdir)
fs = pyarrow.fs.LocalFileSystem()
doc = list(
edsnlp.data.read_parquet(
input_dir,
converter="omop",
span_attributes=["etat", "assertion"],
doc_attributes=["context_var"],
filesystem=fs,
)
)[0]
assert_doc_read(doc)
Expand Down

0 comments on commit f82221e

Please sign in to comment.