Skip to content

Commit

Permalink
Add SQLite mixed-type data handling capabilities (#50)
Browse files Browse the repository at this point in the history
* increase duckdb threads

* dynamic threading for duckdb

* add adbc

* move to parquet for _read_data

* move prepend col ops to data read

* pathing

* remove extension from pathing; move column naming

* enable list of list for concat

* move dest path to tables reference

* fix joins

* raw data parquet chunks; csv duckdb reader

+ preset overrides

* memory safety for parquet joins

* concurrency limits; sequential execution

* corrected sequenced reads; concurrency limit

* re-generalize

* limited concurrent extracts

* concurrency setting update

* wrap functionality in local prefect api default

* prefect server context and avoiding subflows

* migrate from prefect to parsl

* parsl refinement

* avoid hte serialization issues

* gitignore; pre-commit deps updates; linting

* further linting

* test updates

* further testing revisions

* s3 test update

* move to join_app

* flatten loop

* refactor to nonblocking parsl flow

* updated tests and exception handling

* linting

* update documentation

* remove unnecessary deps

* docs; linting; AppBase for sphinx; add max_threads

* linting

* readd python 3.8; case sensitive join work

move from logging.warn to logging.warning (avoid deprecated calls)

* Apply suggestions from code review

Co-authored-by: Gregory Way <[email protected]>

* remove dask section of gitignore

* update _prepend_column_name docstring

Co-Authored-By: Gregory Way <[email protected]>

* fix concat dir cleanup and test

* import safety for python_apps

* further imports for parsl apps non-threaded use

Co-Authored-By: Faisal Alquaddoomi <[email protected]>

* further parsl app imports reconfiguration

Co-Authored-By: Faisal Alquaddoomi <[email protected]>

* add mixed type handler for sqlite sources

* add duckdb mixed type exception handling for the chunked query

* add docstring

* add test for _sqlite_mixed_type_query_to_parquet

* wrap fixture yield with try + finally

Co-authored-by: Faisal Alquaddoomi <[email protected]>

* periods for docstring sentences

---------

Co-authored-by: Gregory Way <[email protected]>
Co-authored-by: Faisal Alquaddoomi <[email protected]>
Co-authored-by: Faisal Alquaddoomi <[email protected]>
  • Loading branch information
4 people authored Jul 7, 2023
1 parent 2dbc90b commit e41256f
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 35 deletions.
72 changes: 46 additions & 26 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def _get_table_chunk_offsets(
Contains the source data to be chunked. Represents a single
file or table of some kind.
chunk_size: int
The size in rowcount of the chunks to create
The size in rowcount of the chunks to create.
Returns:
List[int]
Expand Down Expand Up @@ -110,27 +110,28 @@ def _source_chunk_to_parquet(
Args:
source_group_name: str
Name of the source group (for ex. compartment or metadata table name)
Name of the source group (for ex. compartment or metadata table name).
source: Dict[str, Any]
Contains the source data to be chunked. Represents a single
file or table of some kind along with collected information about table.
chunk_size: int
Row count to use for chunked output
Row count to use for chunked output.
offset: int
The offset for chunking the data from source.
dest_path: str
Path to store the output data.
Returns:
str
A string of the output filepath
A string of the output filepath.
"""

import pathlib

import duckdb
from cloudpathlib import AnyPath

from cytotable.utils import _duckdb_reader
from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet

# attempt to build dest_path
source_dest_path = (
Expand All @@ -152,17 +153,36 @@ def _source_chunk_to_parquet(

result_filepath = f"{result_filepath_base}-{offset}.parquet"

# isolate using new connection to read data with chunk size + offset
# and export directly to parquet via duckdb (avoiding need to return data to python)
_duckdb_reader().execute(
f"""
COPY (
{base_query}
LIMIT {chunk_size} OFFSET {offset}
) TO '{result_filepath}'
(FORMAT PARQUET);
"""
)
# attempt to read the data to parquet from duckdb
# with exception handling to read mixed-type data
# using sqlite3 and special utility function
try:
# isolate using new connection to read data with chunk size + offset
# and export directly to parquet via duckdb (avoiding need to return data to python)
_duckdb_reader().execute(
f"""
COPY (
{base_query}
LIMIT {chunk_size} OFFSET {offset}
) TO '{result_filepath}'
(FORMAT PARQUET);
"""
)
except duckdb.Error as e:
# if we see a mismatched type error
# run a more nuanced query through sqlite
# to handle the mixed types
if (
"Mismatch Type Error" in str(e)
and str(AnyPath(source["source_path"]).suffix).lower() == ".sqlite"
):
result_filepath = _sqlite_mixed_type_query_to_parquet(
source_path=str(source["source_path"]),
table_name=str(source["table_name"]),
chunk_size=chunk_size,
offset=offset,
result_filepath=result_filepath,
)

# return the filepath for the chunked output file
return result_filepath
Expand Down Expand Up @@ -192,13 +212,13 @@ def _prepend_column_name(
Column names which are used as ID's and as a result need to be
treated differently when renaming.
metadata: Union[List[str], Tuple[str, ...]]:
List of source data names which are used as metadata
List of source data names which are used as metadata.
compartments: List[str]:
List of source data names which are used as compartments
List of source data names which are used as compartments.
Returns:
str
Path to the modified file
Path to the modified file.
"""

import pathlib
Expand Down Expand Up @@ -444,15 +464,15 @@ def _get_join_chunks(
sources: Dict[List[Dict[str, Any]]]:
Grouped datasets of files which will be used by other functions.
metadata: Union[List[str], Tuple[str, ...]]:
List of source data names which are used as metadata
List of source data names which are used as metadata.
chunk_columns: Union[List[str], Tuple[str, ...]]:
Column names which appear in all compartments to use when performing join
Column names which appear in all compartments to use when performing join.
chunk_size: int:
Size of join chunks which is used to limit data size during join ops
Size of join chunks which is used to limit data size during join ops.
Returns:
List[List[Dict[str, Any]]]]:
A list of lists with at most chunk size length that contain join keys
A list of lists with at most chunk size length that contain join keys.
"""

import pathlib
Expand Down Expand Up @@ -809,13 +829,13 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
concat: bool:
Whether to concatenate similar files together.
join: bool:
Whether to join the compartment data together into one dataset
Whether to join the compartment data together into one dataset.
joins: str:
DuckDB-compatible SQL which will be used to perform the join operations.
chunk_columns: Optional[Union[List[str], Tuple[str, ...]]],
Column names which appear in all compartments to use when performing join
Column names which appear in all compartments to use when performing join.
chunk_size: Optional[int],
Size of join chunks which is used to limit data size during join ops
Size of join chunks which is used to limit data size during join ops.
infer_common_schema: bool: (Default value = True)
Whether to infer a common schema when concatenating sources.
drop_null: bool:
Expand Down
8 changes: 4 additions & 4 deletions cytotable/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@
class CytoTableException(Exception):
"""
Root exception for custom hierarchy of exceptions
with CytoTable
with CytoTable.
"""


class NoInputDataException(CytoTableException):
"""
Exception for no input data
Exception for no input data.
"""


class DatatypeException(CytoTableException):
"""
Exception for datatype challenges
Exception for datatype challenges.
"""


class SchemaException(CytoTableException):
"""
Exception for schema challenges
Exception for schema challenges.
"""
6 changes: 3 additions & 3 deletions cytotable/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def _build_path(
Path to seek filepaths within.
**kwargs: Any
keyword arguments to be used with
Cloudpathlib.CloudPath.client
Cloudpathlib.CloudPath.client .
Returns:
Union[pathlib.Path, Any]
Expand Down Expand Up @@ -207,7 +207,7 @@ def _filter_source_filepaths(
sources: Dict[str, List[Dict[str, Any]]], source_datatype: str
) -> Dict[str, List[Dict[str, Any]]]:
"""
Filter source filepaths based on provided source_datatype
Filter source filepaths based on provided source_datatype.
Args:
sources: Dict[str, List[Dict[str, Any]]]
Expand Down Expand Up @@ -245,7 +245,7 @@ def _gather_sources(
**kwargs,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Flow for gathering data sources for conversion
Flow for gathering data sources for conversion.
Args:
source_path: str:
Expand Down
89 changes: 88 additions & 1 deletion cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def Parsl_AppBase_init_for_docs(self, func, *args, **kwargs):

def _default_parsl_config():
"""
Return a default Parsl configuration for use with CytoTable
Return a default Parsl configuration for use with CytoTable.
"""
return Config(
executors=[ThreadPoolExecutor(max_threads=MAX_THREADS, label="local_threads")]
Expand Down Expand Up @@ -149,6 +149,93 @@ def _duckdb_reader() -> duckdb.DuckDBPyConnection:
)


def _sqlite_mixed_type_query_to_parquet(
source_path: str,
table_name: str,
chunk_size: int,
offset: int,
result_filepath: str,
) -> str:
"""
Performs SQLite table data extraction where one or many
columns include data values of potentially mismatched type
such that the data may be exported to Arrow and a Parquet file.
Args:
source_path: str:
A str which is a path to a SQLite database file.
table_name: str:
The name of the table being queried.
chunk_size: int:
Row count to use for chunked output.
offset: int:
The offset for chunking the data from source.
dest_path: str:
Path to store the output data.
Returns:
str:
The resulting filepath for the table exported to parquet.
"""
import sqlite3

import pyarrow as pa
import pyarrow.parquet as parquet

# open sqlite3 connection
with sqlite3.connect(source_path) as conn:
cursor = conn.cursor()

# gather table column details including datatype
cursor.execute(
f"""
SELECT :table_name as table_name,
name as column_name,
type as column_type
FROM pragma_table_info(:table_name);
""",
{"table_name": table_name},
)

# gather column metadata details as list of dictionaries
column_info = [
dict(zip([desc[0] for desc in cursor.description], row))
for row in cursor.fetchall()
]

# create cases for mixed-type handling in each column discovered above
query_parts = [
f"""
CASE
/* when the storage class type doesn't match the column, return nulltype */
WHEN typeof({col['column_name']}) != '{col['column_type'].lower()}' THEN NULL
/* else, return the normal value */
ELSE {col['column_name']}
END AS {col['column_name']}
"""
for col in column_info
]

# perform the select using the cases built above and using chunksize + offset
cursor.execute(
f'SELECT {", ".join(query_parts)} FROM {table_name} LIMIT {chunk_size} OFFSET {offset};'
)
# collect the results and include the column name with values
results = [
dict(zip([desc[0] for desc in cursor.description], row))
for row in cursor.fetchall()
]

# write results to a parquet file
parquet.write_table(
table=pa.Table.from_pylist(results),
where=result_filepath,
)

# return filepath
return result_filepath


def _cache_cloudpath_to_local(path: Union[str, AnyPath]) -> pathlib.Path:
"""
Takes a cloudpath and uses cache to convert to a local copy
Expand Down
57 changes: 57 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import pathlib
import shutil
import sqlite3
import subprocess
import tempfile
from typing import Any, Dict, Generator, List, Tuple
Expand Down Expand Up @@ -488,3 +489,59 @@ def example_s3_endpoint(

# return endpoint url for use in testing
return endpoint_url


@pytest.fixture()
def example_sqlite_mixed_types_database(
get_tempdir: str,
) -> Generator:
"""
Creates a database which includes mixed type columns
for testing specific functionality within CytoTable
"""

# create a temporary sqlite connection
filepath = f"{get_tempdir}/example_mixed_types.sqlite"

# statements for creating database with simple structure
create_stmts = [
"DROP TABLE IF EXISTS tbl_a;",
"""
CREATE TABLE tbl_a (
col_integer INTEGER NOT NULL
,col_text TEXT
,col_blob BLOB
,col_real REAL
);
""",
]

# some example values to insert into the database
insert_vals = [1, "sample", b"sample_blob", 0.5]
err_values = ["nan", "sample", b"another_blob", "nan"]

# create the database and insert some data into it
with sqlite3.connect(filepath) as connection:
for stmt in create_stmts:
connection.execute(stmt)

connection.execute(
(
"INSERT INTO tbl_a (col_integer, col_text, col_blob, col_real)"
"VALUES (?, ?, ?, ?);"
),
insert_vals,
)
connection.execute(
(
"INSERT INTO tbl_a (col_integer, col_text, col_blob, col_real)"
"VALUES (?, ?, ?, ?);"
),
err_values,
)

try:
yield filepath
finally:
# after completing the tests, remove the file
pathlib.Path(filepath).unlink()
Loading

0 comments on commit e41256f

Please sign in to comment.