From 467955691ce3af3c3239638a5f341dfa7a0771ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Perceval=20Wajsb=C3=BCrt?= Date: Wed, 29 Nov 2023 16:08:28 +0100 Subject: [PATCH] feat: add edsnlp.data support for parquet files with parallel reading / writing --- docs/data/parquet.md | 41 +++++ edsnlp/data/__init__.py | 1 + edsnlp/data/parquet.py | 281 ++++++++++++++++++++++++++++++ edsnlp/utils/collections.py | 14 +- mkdocs.yml | 1 + pyproject.toml | 1 + tests/conftest.py | 13 +- tests/data/test_parquet.py | 247 ++++++++++++++++++++++++++ tests/processing/test_backends.py | 60 ++++++- tests/resources/docs.pq | Bin 0 -> 9219 bytes 10 files changed, 646 insertions(+), 13 deletions(-) create mode 100644 tests/data/test_parquet.py create mode 100644 tests/resources/docs.pq diff --git a/docs/data/parquet.md b/docs/data/parquet.md index e69de29bb..029427471 100644 --- a/docs/data/parquet.md +++ b/docs/data/parquet.md @@ -0,0 +1,41 @@ +# Parquet + +??? abstract "TLDR" + + ```{ .python .no-check } + import edsnlp + + iterator = edsnlp.data.read_parquet(source_path, converter="omop") + docs = nlp.pipe(iterator) + res = edsnlp.data.write_parquet(dest_path, docs, converter="omop") + ``` + +We provide methods to read and write documents (raw or annotated) from and to parquet files. + +As an example, imagine that we have the following document that uses the OMOP schema (parquet files are not actually stored as human-readable text, but this is for the sake of the example): + +```{ title="data.pq" } +{ "note_id": 0, "note_text": "Le patient ...", "note_datetime": "2021-10-23", "entities": [...] } +{ "note_id": 1, "note_text": "Autre doc ...", "note_datetime": "2022-12-24", "entities": [] } +... +``` + +You could also have multiple parquet files in a directory, the reader will read them all. + +## Reading Parquet files {: #edsnlp.data.parquet.read_parquet } + +::: edsnlp.data.parquet.read_parquet + options: + heading_level: 3 + show_source: false + show_toc: false + show_bases: false + +## Writing Parquet files {: #edsnlp.data.parquet.write_parquet } + +::: edsnlp.data.parquet.write_parquet + options: + heading_level: 3 + show_source: false + show_toc: false + show_bases: false diff --git a/edsnlp/data/__init__.py b/edsnlp/data/__init__.py index e1e724749..29ab65f37 100644 --- a/edsnlp/data/__init__.py +++ b/edsnlp/data/__init__.py @@ -8,6 +8,7 @@ from .standoff import read_standoff, write_standoff from .brat import read_brat, write_brat from .json import read_json, write_json + from .parquet import read_parquet, write_parquet from .spark import from_spark, to_spark from .pandas import from_pandas, to_pandas from .converters import get_dict2doc_converter, get_doc2dict_converter diff --git a/edsnlp/data/parquet.py b/edsnlp/data/parquet.py index e69de29bb..67f0f17fc 100644 --- a/edsnlp/data/parquet.py +++ b/edsnlp/data/parquet.py @@ -0,0 +1,281 @@ +from itertools import chain +from pathlib import Path +from typing import Any, Callable, Iterable, List, Optional, TypeVar, Union + +import pyarrow.dataset +import pyarrow.fs +import pyarrow.parquet +from pyarrow.dataset import ParquetFileFragment + +from edsnlp import registry +from edsnlp.core.lazy_collection import LazyCollection +from edsnlp.data.base import BaseReader, BaseWriter +from edsnlp.data.converters import ( + FILENAME, + get_dict2doc_converter, + get_doc2dict_converter, +) +from edsnlp.utils.collections import dl_to_ld, flatten_once, ld_to_dl + + +class ParquetReader(BaseReader): + DATA_FIELDS = ("dataset",) + + def __init__( + self, + path: Union[str, Path], + *, + read_in_worker: bool, + ): + super().__init__() + self.path = Path(path) + self.read_in_worker = read_in_worker + self.dataset = pyarrow.dataset.dataset(self.path, format="parquet") + + def read_main(self): + fragments: List[ParquetFileFragment] = self.dataset.get_fragments() + if self.read_in_worker: + # read in worker -> each task is a file to read from + return ((f, f.metadata.num_rows) for f in fragments) + else: + # read in worker -> each task is a non yet parsed line + return ( + (line, 1) + for f in fragments + for batch in f.to_table().to_batches(1024) + for line in dl_to_ld(batch.to_pydict()) + ) + + def read_worker(self, tasks): + if self.read_in_worker: + tasks = list( + chain.from_iterable( + dl_to_ld(batch.to_pydict()) + for task in tasks + for batch in task.to_table().to_batches(1024) + ) + ) + return tasks + + +T = TypeVar("T") + + +class ParquetWriter(BaseWriter): + def __init__( + self, + path: Union[str, Path], + num_rows_per_file: int, + overwrite: bool, + write_in_worker: bool, + accumulate: bool = True, + ): + super().__init__() + fs, path = pyarrow.fs.FileSystem.from_uri(path) + fs: pyarrow.fs.FileSystem + fs.create_dir(path, recursive=True) + if overwrite is False: + dataset = pyarrow.dataset.dataset(path, format="parquet", filesystem=fs) + if len(list(dataset.get_fragments())): + raise FileExistsError( + f"Directory {path} already exists and is not empty. " + "Use overwrite=True to overwrite." + ) + self.path = Path(path) + self.write_in_worker = write_in_worker + self.batch = [] + self.num_rows_per_file = num_rows_per_file + self.closed = False + self.finalized = False + self.accumulate = accumulate + if not self.accumulate: + self.finalize = super().finalize + + def write_worker(self, records, last=False): + # Results will contain a batches of samples ready to be written (or None if + # write_in_worker is True) and they have already been written. + n_to_fill = self.num_rows_per_file - len(self.batch) + results = [] + count = 0 + + for rec in records: + if isinstance(rec, dict): + rec.pop(FILENAME, None) + + # While there is something to write + greedy = last or not self.accumulate + while len(records) or greedy and len(self.batch): + self.batch.extend(records[:n_to_fill]) + records = records[n_to_fill:] + if greedy or len(self.batch) == self.num_rows_per_file: + fragment = pyarrow.Table.from_pydict(ld_to_dl(self.batch)) # type: ignore + count += len(self.batch) + self.batch = [] + if self.write_in_worker: + pyarrow.parquet.write_to_dataset( + table=fragment, + root_path=self.path, + ) + fragment = None + results.append(fragment) + return results, count + + def finalize(self): + if not self.finalized: + self.finalized = True + return self.write_worker([], last=True) + + def write_main(self, fragments: Iterable[List[Union[pyarrow.Table, Path]]]): + fragments = list(fragments) + for table in flatten_once(fragments): + if not self.write_in_worker: + pyarrow.parquet.write_to_dataset( + table=table, + root_path=self.path, + ) + return pyarrow.dataset.dataset(self.path) + + +@registry.readers.register("parquet") +def read_parquet( + path: Union[str, Path], + converter: Union[str, Callable], + *, + read_in_worker: bool = False, + **kwargs, +) -> LazyCollection: + """ + The ParquetReader (or `edsnlp.data.read_parquet`) reads a directory of parquet files + (or a single file) and yields documents. + + Example + ------- + ```{ .python .no-check } + + import edsnlp + + nlp = edsnlp.blank("eds") + nlp.add_pipe(...) + doc_iterator = edsnlp.data.read_parquet("path/to/parquet/directory", nlp=nlp) + annotated_docs = nlp.pipe(doc_iterator) + ``` + + !!! note "Generator vs list" + + `edsnlp.data.read_parquet` returns a + [LazyCollection][edsnlp.core.lazy_collection.LazyCollection]. + To iterate over the documents multiple times efficiently or to access them by + index, you must convert it to a list + + ```{ .python .no-check } + docs = list(edsnlp.data.read_parquet("path/to/parquet/directory", nlp=nlp)) + ``` + + Parameters + ---------- + path: Union[str, Path] + Path to the directory containing the parquet files (will recursively look for + files in subdirectories). + converter: Union[str, Callable] + Converter to use to convert the parquet rows of the data source to Doc objects + read_in_worker: bool + Whether to read the files in the worker or in the main process. + kwargs: + Additional keyword arguments to pass to the converter. These are documented + on the [Data schemas](/data/schemas) page. + + Returns + ------- + LazyCollection + """ + data = LazyCollection( + reader=ParquetReader( + path, + read_in_worker=read_in_worker, + ) + ) + if converter: + converter, kwargs = get_dict2doc_converter(converter, kwargs) + data = data.map(converter, kwargs=kwargs) + return data + + +@registry.writers.register("parquet") +def write_parquet( + data: Union[Any, LazyCollection], + path: Union[str, Path], + *, + write_in_worker: bool = False, + num_rows_per_file: int = 1024, + overwrite: bool = False, + accumulate: bool = True, + converter: Optional[Union[str, Callable]], + **kwargs, +) -> None: + """ + `edsnlp.data.write_parquet` writes a list of documents as a parquet dataset. + + Example + ------- + ```{ .python .no-check } + + import edsnlp + + nlp = edsnlp.blank("eds") + nlp.add_pipe(...) + + doc = nlp("My document with entities") + + edsnlp.data.write_parquet([doc], "path/to/parquet/directory") + ``` + + !!! warning "Overwriting files" + + By default, `write_parquet` will raise an error if the directory already exists + and contains parquet files. This is to avoid overwriting existing annotations. + To allow overwriting existing files, use `overwrite=True`. + + Parameters + ---------- + data: Union[Any, LazyCollection], + The data to write (either a list of documents or a LazyCollection). + path: Union[str, Path] + Path to the directory containing the parquet files (will recursively look for + files in subdirectories). + num_rows_per_file: int + The maximum number of documents to write in each parquet file. + overwrite: bool + Whether to overwrite existing directories. + write_in_worker: bool + Whether to write the files in the workers or in the main process. + accumulate: bool + Whether to accumulate the results sent to the writer by workers until the + batch is full or the writer is finalized. If False, each file will not be larger + than the size of the batches it receives. This option requires that the writer + is finalized before the end of the processing, which may not be compatible with + some backends, such as `spark`. + + If `write_in_worker` is True, documents will be accumulated in each worker but + not across workers, therefore leading to a larger number of files. + converter: Optional[Union[str, Callable]] + Converter to use to convert the documents to dictionary objects before writing + them. + kwargs: + Additional keyword arguments to pass to the converter. These are documented + on the [Data schemas](/data/schemas) page. + """ + + data = LazyCollection.ensure_lazy(data) + if converter: + converter, kwargs = get_doc2dict_converter(converter, kwargs) + data = data.map(converter, kwargs=kwargs) + + return data.write( + ParquetWriter( + path, + num_rows_per_file=num_rows_per_file, + overwrite=overwrite, + write_in_worker=write_in_worker, + accumulate=accumulate, + ) + ) diff --git a/edsnlp/utils/collections.py b/edsnlp/utils/collections.py index 78662231f..8ed05db93 100644 --- a/edsnlp/utils/collections.py +++ b/edsnlp/utils/collections.py @@ -3,7 +3,17 @@ import math import sys from collections import defaultdict -from typing import Any, Dict, Iterable, List, Mapping, Sequence, TypeVar, Union +from typing import ( + Any, + Dict, + Iterable, + Iterator, + List, + Mapping, + Sequence, + TypeVar, + Union, +) It = TypeVar("It", bound=Iterable) T = TypeVar("T") @@ -27,7 +37,7 @@ def ld_to_dl(ld: Iterable[Mapping[str, T]]) -> Dict[str, List[T]]: return {k: [dic.get(k) for dic in ld] for k in ld[0]} -def dl_to_ld(dl: Mapping[str, Sequence[Any]]) -> Iterable[Dict[str, Any]]: +def dl_to_ld(dl: Mapping[str, Sequence[Any]]) -> Iterator[Dict[str, Any]]: """ Convert a dictionary of lists to a list of dictionaries diff --git a/mkdocs.yml b/mkdocs.yml index a62b9c133..588f869a8 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -127,6 +127,7 @@ nav: - data/index.md - data/standoff.md - data/json.md + - data/parquet.md - data/pandas.md - data/spark.md - data/schemas.md diff --git a/pyproject.toml b/pyproject.toml index 3b324299b..b7b03acf1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ # Packaging "build>=1.0.0", "toml", + "pyarrow>=2.0.0", ] [project.optional-dependencies] dev = [ diff --git a/tests/conftest.py b/tests/conftest.py index 0db86d970..8526cae8b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,8 +12,7 @@ def lang(request): return request.param -@fixture(scope="session") -def nlp(lang): +def make_nlp(lang): if lang == "eds": model = spacy.blank("eds") else: @@ -52,6 +51,16 @@ def nlp(lang): return model +@fixture(scope="session") +def nlp(lang): + return make_nlp(lang) + + +@fixture(scope="session") +def nlp_eds(): + return make_nlp("eds") + + @fixture def blank_nlp(lang): if lang == "eds": diff --git a/tests/data/test_parquet.py b/tests/data/test_parquet.py new file mode 100644 index 000000000..49c5b55d6 --- /dev/null +++ b/tests/data/test_parquet.py @@ -0,0 +1,247 @@ +from pathlib import Path + +import pyarrow.dataset +import pytest + +import edsnlp +from edsnlp.utils.collections import dl_to_ld + + +def assert_doc_read(doc): + assert doc._.note_id == "subfolder/doc-1" + assert doc._.context_var == "test" + + attrs = ("etat", "assertion") + spans_and_attributes = { + "__ents__": sorted( + [ + (e.start, e.end, e.text, tuple(getattr(e._, key) for key in attrs)) + for e in doc.ents + ] + ), + **{ + name: sorted( + [ + (e.start, e.end, e.text, tuple(getattr(e._, key) for key in attrs)) + for e in doc.spans[name] + ] + ) + for name in doc.spans + }, + } + + assert spans_and_attributes == { + "__ents__": [ + (6, 7, "douleurs", (None, None)), + (7, 11, "dans le bras droit", (None, None)), + (17, 21, "problème \nde locomotion", (None, "absent")), + (25, 26, "AVC", ("passé", "non-associé")), + (35, 36, "rhume", ("présent", "hypothétique")), + (45, 46, "rhume", ("présent", "hypothétique")), + (51, 52, "Douleurs", (None, None)), + (52, 56, "dans le bras droit", (None, None)), + (68, 69, "anomalie", (None, "absent")), + ], + "anatomie": [ + (9, 11, "bras droit", (None, None)), + (54, 56, "bras droit", (None, None)), + ], + "localisation": [ + (7, 11, "dans le bras droit", (None, None)), + (52, 56, "dans le bras droit", (None, None)), + ], + "pathologie": [ + (17, 21, "problème \nde locomotion", (None, "absent")), + (25, 26, "AVC", ("passé", "non-associé")), + (35, 36, "rhume", ("présent", "hypothétique")), + (45, 46, "rhume", ("présent", "hypothétique")), + ], + "sosy": [ + (6, 7, "douleurs", (None, None)), + (51, 52, "Douleurs", (None, None)), + (68, 69, "anomalie", (None, "absent")), + ], + } + + +def assert_doc_write(exported_obj): + assert exported_obj == { + "entities": [ + { + "assertion": None, + "end_char": 38, + "etat": "test", + "lexical_variant": "douleurs", + "note_nlp_id": 0, + "note_nlp_source_value": "sosy", + "start_char": 30, + }, + { + "assertion": None, + "end_char": 57, + "etat": None, + "lexical_variant": "dans le bras droit", + "note_nlp_id": 1, + "note_nlp_source_value": "localisation", + "start_char": 39, + }, + { + "assertion": None, + "end_char": 57, + "etat": None, + "lexical_variant": "bras droit", + "note_nlp_id": 2, + "note_nlp_source_value": "anatomie", + "start_char": 47, + }, + { + "assertion": "absent", + "end_char": 98, + "etat": None, + "lexical_variant": "problème \nde locomotion", + "note_nlp_id": 3, + "note_nlp_source_value": "pathologie", + "start_char": 75, + }, + { + "assertion": "non-associé", + "end_char": 117, + "etat": "passé", + "lexical_variant": "AVC", + "note_nlp_id": 4, + "note_nlp_source_value": "pathologie", + "start_char": 114, + }, + { + "assertion": "hypothétique", + "end_char": 164, + "etat": "présent", + "lexical_variant": "rhume", + "note_nlp_id": 5, + "note_nlp_source_value": "pathologie", + "start_char": 159, + }, + { + "assertion": "hypothétique", + "end_char": 296, + "etat": "présent", + "lexical_variant": "rhume", + "note_nlp_id": 6, + "note_nlp_source_value": "pathologie", + "start_char": 291, + }, + { + "assertion": None, + "end_char": 314, + "etat": None, + "lexical_variant": "Douleurs", + "note_nlp_id": 7, + "note_nlp_source_value": "sosy", + "start_char": 306, + }, + { + "assertion": None, + "end_char": 333, + "etat": None, + "lexical_variant": "dans le bras droit", + "note_nlp_id": 8, + "note_nlp_source_value": "localisation", + "start_char": 315, + }, + { + "assertion": None, + "end_char": 333, + "etat": None, + "lexical_variant": "bras droit", + "note_nlp_id": 9, + "note_nlp_source_value": "anatomie", + "start_char": 323, + }, + { + "assertion": "absent", + "end_char": 386, + "etat": None, + "lexical_variant": "anomalie", + "note_nlp_id": 10, + "note_nlp_source_value": "sosy", + "start_char": 378, + }, + ], + "note_id": "subfolder/doc-1", + "context_var": "test", + "note_text": "Le patient est admis pour des douleurs dans le bras droit, mais " + "n'a pas de problème \n" + "de locomotion. \n" + "Historique d'AVC dans la famille. pourrait être un cas de " + "rhume.\n" + "NBNbWbWbNbWbNBNbNbWbWbNBNbWbNbNbWbNBNbWbNbNBWbWbNbNbNBWbNbWbNbWBNbNbWbNbNBNbWb" + "WbNbWBNbNbWbNBNbWbWbNb\n" + "Pourrait être un cas de rhume.\n" + "Motif :\n" + "Douleurs dans le bras droit.\n" + "ANTÉCÉDENTS\n" + "Le patient est déjà venu\n" + "Pas d'anomalie détectée.\n", + } + + +def test_read_write_in_worker(blank_nlp, tmpdir): + input_dir = Path(__file__).parent.parent.resolve() / "resources" / "docs.pq" + output_dir = Path(tmpdir) + edsnlp.data.read_parquet( + input_dir, + converter="omop", + span_attributes=["etat", "assertion"], + read_in_worker=True, + ).write_parquet( + output_dir / "docs.pq", + converter="omop", + doc_attributes=["context_var"], + span_attributes=["etat", "assertion"], + span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"], + write_in_worker=True, + ) + # fmt: off + assert ( + list(dl_to_ld(pyarrow.dataset.dataset(output_dir / "docs.pq").to_table().to_pydict())) # noqa: E501 + == list(dl_to_ld(pyarrow.dataset.dataset(input_dir).to_table().to_pydict())) + ) + # fmt: on + + +def test_read_to_parquet(blank_nlp, tmpdir): + input_dir = Path(__file__).parent.parent.resolve() / "resources" / "docs.pq" + output_dir = Path(tmpdir) + doc = list( + edsnlp.data.read_parquet( + input_dir, + converter="omop", + span_attributes=["etat", "assertion"], + doc_attributes=["context_var"], + ) + )[0] + assert_doc_read(doc) + doc.ents[0]._.etat = "test" + + edsnlp.data.write_parquet( + [doc], + output_dir, + converter="omop", + doc_attributes=["context_var"], + span_attributes=["etat", "assertion"], + span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"], + ) + + assert_doc_write( + next(dl_to_ld(pyarrow.dataset.dataset(output_dir).to_table().to_pydict())) + ) + + with pytest.raises(FileExistsError): + edsnlp.data.write_parquet( + [doc], + output_dir, + converter="omop", + doc_attributes=["context_var"], + span_attributes=["etat", "assertion"], + span_getter=["ents", "sosy", "localisation", "anatomie", "pathologie"], + ) diff --git a/tests/processing/test_backends.py b/tests/processing/test_backends.py index 6fe7313b4..0816a02c6 100644 --- a/tests/processing/test_backends.py +++ b/tests/processing/test_backends.py @@ -1,4 +1,5 @@ from itertools import chain +from pathlib import Path import pandas as pd import pytest @@ -38,24 +39,45 @@ ] -@pytest.mark.parametrize("reader_format", ["pandas"]) -@pytest.mark.parametrize("reader_converter", ["omop"]) -@pytest.mark.parametrize("backend", ["simple", "multiprocessing", "spark"]) -@pytest.mark.parametrize("writer_format", ["pandas"]) -@pytest.mark.parametrize("writer_converter", ["omop"]) +@pytest.mark.parametrize( + "reader_format,reader_converter,backend,writer_format,writer_converter,worker_io", + [ + ("pandas", "omop", "simple", "pandas", "omop", False), + ("pandas", "omop", "multiprocessing", "pandas", "omop", False), + ("pandas", "omop", "spark", "pandas", "omop", False), + ("parquet", "omop", "simple", "parquet", "omop", False), + ("parquet", "omop", "multiprocessing", "parquet", "omop", False), + ("parquet", "omop", "spark", "parquet", "omop", False), + ("parquet", "omop", "multiprocessing", "parquet", "omop", True), + ("parquet", "omop", "spark", "parquet", "omop", True), + ], +) def test_end_to_end( reader_format, reader_converter, - nlp, backend, writer_format, writer_converter, + worker_io, + nlp_eds, + tmp_path, ): + nlp = nlp_eds + rsrc = Path(__file__).parent.parent.resolve() / "resources" if reader_format == "pandas": pandas_dataframe = pd.DataFrame(docs) - data = edsnlp.data.from_pandas(pandas_dataframe, converter=reader_converter) + data = edsnlp.data.from_pandas( + pandas_dataframe, + converter=reader_converter, + ) + elif reader_format == "parquet": + data = edsnlp.data.read_parquet( + rsrc / "docs.pq", + converter=reader_converter, + read_in_worker=worker_io, + ) else: - raise Exception() + raise ValueError(reader_format) data = data.map_model(nlp) @@ -63,8 +85,28 @@ def test_end_to_end( if writer_format == "pandas": data.to_pandas(converter=writer_converter) + elif writer_format == "parquet": + if backend == "spark": + with pytest.raises(ValueError): + data.write_parquet( + tmp_path, + converter=writer_converter, + write_in_worker=worker_io, + ) + data.write_parquet( + tmp_path, + converter=writer_converter, + accumulate=False, + write_in_worker=worker_io, + ) + else: + data.write_parquet( + tmp_path, + converter=writer_converter, + write_in_worker=worker_io, + ) else: - raise Exception() + raise ValueError(writer_format) def test_multiprocessing_backend(frozen_ml_nlp): diff --git a/tests/resources/docs.pq b/tests/resources/docs.pq new file mode 100644 index 0000000000000000000000000000000000000000..90c0b7f47e1f0c25adcb202e47654557c9ff213a GIT binary patch literal 9219 zcmeHNTWlj|6`pY%uajoG>F&80s`dok^dfD9Ub#Ra27{YhE^` z*)T&frx|tzx{NbE#y7b{O`q!P66mi0DI-A-Ts<`Hp}61Elg!vebe!@{1Wp*Hqim`& z<6%AQFKG5RG?(N2|DDu{8Nf?19ttkwaHA+Fk27aG`2CY!sB;wC835Xg^#OA2gLlh& z0PhjWG#q8u+1wE<0`VrzzC?3x4RJ3GA;?}S1L3^kV?OY90KGc&!M9(bDJrW3>w=>y zh7$x3K|$74J6JaxR!~;#plmiYrD4H?VAw%T35u2g1OnV>E}SL2)j0-z4$Sf;4$ zzpg96Q7CDqWa_4)n#NpkG_Bf>X{j$Xl%RZ$FC^OXgkV+BRZUaog0y|n5>zL+|6|8e zf(;`m(c52Bf>y1eD|4f{cup+BKiWr+(SE8pJnAosx#34?c6f}6k4$@WB0Wk^k3+9` zj`j_orE-s@A3MvO9v@wYxftJ~f@^@K8oV^RTz#CrJU7bcHuhgm?7zIclH0gCdg&W< zNZ!A_z5m1Dj$$-M*XciPoD&RF7c>_(mKJ2hA{jLGF)_9|XA(5`!_!kr4Oc8TO6S+?%7^kA^YCJv8WpY0!r&4TI$a7y_w* zi9ms=WFcX~yz0X1K3I){XhQ-p22X}*%nG(`O6vaYlTfa;>ZVhJ_aDBy;oUdGgk~7F zlW&bpkM#}+_*_9Sj@=l7>+ru%oWR?=Y3p+|N;C{-c%u)P}O2VT`t z^bt_5SeP}y`e)e>Y4%?<_Y;nL>&eNbdubau$6GWbW+6SqNv6FVH9}LdkrQ4@Jn5$z zaK3iZ@13ScMyN|C{g>Au5yIn}?K@0#j)@#hG~n;F1pJ)am13#s0kzQhc&R;g%mP`j zcX<5s38m?$DDKXMNyh6Yn}g;KbjX`hjbwK$`;Ldb`tRJ##(t4Mo=pi5X=ec9e z>Qx*8BRb2zO|!qIx%X$eS7#>&>KHhIztGGd=q$J$PQeKw@J?!anjTv}lAMl0hlxku z4v+~B&Ps6}kcke%jbukCbNS@gXYk(1{Em=9O%IdKfgug@KNzgErR|O!S__~LE&%@n{aNi5zyRKK!Ku&&2Gg=nbB2rSKsFO5BgI{~# z9D>se=P;Z;I8VUo2R9!9#CRY(cpP=}PD=n;AP2hn(OPIa>gJJ+o$o?d?xUq+-TV>q ziZ`^nyVYXZe|Xq8J=|}j+}%a+^rOn|0bw91GE>Vke!@xQE^fJO({5p4gY5|9@-}RB zOv5#?;F!9qpgPz9)lALYQZd58rn6@iz_tn{1pI>G_J{6g(uWqfXZXFZj!@kDmyR_# z)19>qNxGzf={1O`X!s9k_MbHO_ZauLi)0_yr*Pmr!T46!_}(Wlyq&}FHV>wL22Kmk zA3%uv0*1H<6uu1hPqf3^-aW%yrzPNj0w#(Fl7F-o8ji95SB|y+sO<=ra}emj{67Qp zKh6F8GMN9Uu6rODM9aiqqg23WOEpCo7(c~~J?ZK0>Yt>tlK`94^=_qm>kqQec5T!X z=)R=jGlMUC8AL?A0rJ9vZ9cS<*B0}rEBN5`)e-9j2;IQ$b|S&B(~c1P!$GGF$M&uk zclcmW>K)U=FvWT%CMWKEg5No~`}d84d$<$8z4pUB_kH=I^zdUy?{}*KJAqFp7ypd1 z-cK|m{p9nz&n}7jG!q%_qt7T+U~ipcJ)EDt`^B@cQJBFEjv)(n;QPK8r&L2$b~hzc zYv_hObt(AVi&NL34xXkg!Pru!W`a|Opes;eG&BuLB%y&~h&-Ehcnp(Ag*@JP9ty6! zO(I-ri#;r*sw$eidAQDnwdu6#sBWrZsFEqW2pW35)on1vZTLWq$aL6g$$}$5_ZO!W z1G=lm7LL8Pk(|9ab)yH&VH8vCUq4XDAt=2K{}&!)>&6VgYR?fk>HeT`RBK=3{ngz4 z%6L@op{Dz*wez{-sM6k+KAre!V$oIHIwpKC8w>rL}4a#pw0Swz-utR%@azYZU$v3L#p zH00v0olWqnw2-e!#ucdJo9*(Bl*lY)mRIadVs%eW6`Ck#trV8Ay&Q?zaxvT}MO@vL zQdeiATnua2S2PN?m_U7%dZoB~y%ZUsHQ`wCzwc3YW zYC->`1N?Px@v5lDLX|?SUU6mXLa|_1lCkcbpzjWFQ5A|;RKnB!#-ECY#nf&rn?SoB z-~v7xpm$MD#vfIqO8q!*w*)6>Xi_T~imE+3gjzdKK1~5r_4iuSwxpq*7Yl6N<40k+Hqf$X#9B z%NQXD7-b9^?Rl-iGX|OX8iti+3@^DRh82iMJRe?Pz<35-n%A%lt#FW_L_!Y+IyWB0 z4|pTtYI$wGIzJy@f2ut`WMj`9Ufl6%d@7@w1ctF?1co6Elwf=VJa9nJgoEG%n-m;U zj4xl9pO0nccjo7z9OrAKR>CU*MT!`AdESj@`D!^*t4Sa>u}DL`a-iwMX7{=p3&} z3pq`KwI)UuvD}0~^Dp$_Cr;914+5=i4-yAa?JDY