Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQLite mixed-type data handling capabilities #50

Merged
merged 54 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
cb0440b
increase duckdb threads
d33bs Apr 20, 2023
f0a5bc8
dynamic threading for duckdb
d33bs Apr 20, 2023
d1b0bfd
add adbc
d33bs Apr 21, 2023
c8bf5f9
move to parquet for _read_data
d33bs Apr 22, 2023
edfd4c6
move prepend col ops to data read
d33bs Apr 22, 2023
105e735
pathing
d33bs Apr 23, 2023
e3eaa63
remove extension from pathing; move column naming
d33bs Apr 24, 2023
065cdd4
enable list of list for concat
d33bs Apr 24, 2023
b8aa21f
move dest path to tables reference
d33bs Apr 24, 2023
b245a2c
fix joins
d33bs Apr 25, 2023
c3bbd08
raw data parquet chunks; csv duckdb reader
d33bs Apr 25, 2023
3b6e99a
memory safety for parquet joins
d33bs Apr 25, 2023
8ef35c2
concurrency limits; sequential execution
d33bs Apr 25, 2023
1b60ca7
corrected sequenced reads; concurrency limit
d33bs Apr 26, 2023
769a897
re-generalize
d33bs Apr 26, 2023
02893b7
limited concurrent extracts
d33bs Apr 26, 2023
3e15472
concurrency setting update
d33bs Apr 26, 2023
72c3e9e
wrap functionality in local prefect api default
d33bs Apr 27, 2023
2f3fc7c
prefect server context and avoiding subflows
d33bs Apr 27, 2023
eb704c4
migrate from prefect to parsl
d33bs Apr 27, 2023
e8c5b83
parsl refinement
d33bs Apr 28, 2023
669b503
avoid hte serialization issues
d33bs Apr 28, 2023
f456162
gitignore; pre-commit deps updates; linting
d33bs Apr 28, 2023
27c4c5c
further linting
d33bs Apr 28, 2023
0f64dcb
test updates
d33bs Apr 28, 2023
8feaeea
further testing revisions
d33bs Apr 28, 2023
4bd197e
s3 test update
d33bs Apr 29, 2023
adfcf91
move to join_app
d33bs Apr 29, 2023
a71323d
flatten loop
d33bs Apr 30, 2023
d1c5547
refactor to nonblocking parsl flow
d33bs Apr 30, 2023
8c8aca6
updated tests and exception handling
d33bs May 2, 2023
d32a2c7
linting
d33bs May 2, 2023
1ab86c6
update documentation
d33bs May 2, 2023
94029f8
remove unnecessary deps
d33bs May 2, 2023
096d8c1
docs; linting; AppBase for sphinx; add max_threads
d33bs May 2, 2023
7cde0d2
linting
d33bs May 2, 2023
9201484
readd python 3.8; case sensitive join work
d33bs May 3, 2023
851bbda
Apply suggestions from code review
d33bs May 4, 2023
7f5c6b7
remove dask section of gitignore
d33bs May 4, 2023
3562067
update _prepend_column_name docstring
d33bs May 4, 2023
8c7af62
fix concat dir cleanup and test
d33bs May 4, 2023
7a8955a
import safety for python_apps
d33bs May 6, 2023
86dfe82
further imports for parsl apps non-threaded use
d33bs May 7, 2023
c461d35
further parsl app imports reconfiguration
d33bs May 7, 2023
174d164
add mixed type handler for sqlite sources
d33bs May 9, 2023
52273ac
add duckdb mixed type exception handling for the chunked query
d33bs May 9, 2023
68243c4
add docstring
d33bs May 9, 2023
e6ee34b
add test for _sqlite_mixed_type_query_to_parquet
d33bs May 9, 2023
cf9b4d4
Merge remote-tracking branch 'upstream/main' into sqlite-mixed-types-…
d33bs May 15, 2023
f2b228e
wrap fixture yield with try + finally
d33bs May 24, 2023
0cfae5d
periods for docstring sentences
d33bs May 24, 2023
e19c414
Merge branch 'sqlite-mixed-types-handling' of https://github.com/d33b…
d33bs May 24, 2023
409daf9
Merge remote-tracking branch 'upstream/main' into sqlite-mixed-types-…
d33bs Jun 5, 2023
dbe9ce3
Merge remote-tracking branch 'upstream/main' into sqlite-mixed-types-…
d33bs Jul 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 46 additions & 26 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def _get_table_chunk_offsets(
Contains the source data to be chunked. Represents a single
file or table of some kind.
chunk_size: int
The size in rowcount of the chunks to create
The size in rowcount of the chunks to create.

Returns:
List[int]
Expand Down Expand Up @@ -110,27 +110,28 @@ def _source_chunk_to_parquet(

Args:
source_group_name: str
Name of the source group (for ex. compartment or metadata table name)
Name of the source group (for ex. compartment or metadata table name).
source: Dict[str, Any]
Contains the source data to be chunked. Represents a single
file or table of some kind along with collected information about table.
chunk_size: int
Row count to use for chunked output
Row count to use for chunked output.
offset: int
The offset for chunking the data from source.
dest_path: str
Path to store the output data.

Returns:
str
A string of the output filepath
A string of the output filepath.
"""

import pathlib

import duckdb
from cloudpathlib import AnyPath

from cytotable.utils import _duckdb_reader
from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet

# attempt to build dest_path
source_dest_path = (
Expand All @@ -152,17 +153,36 @@ def _source_chunk_to_parquet(

result_filepath = f"{result_filepath_base}-{offset}.parquet"

# isolate using new connection to read data with chunk size + offset
# and export directly to parquet via duckdb (avoiding need to return data to python)
_duckdb_reader().execute(
f"""
COPY (
{base_query}
LIMIT {chunk_size} OFFSET {offset}
) TO '{result_filepath}'
(FORMAT PARQUET);
"""
)
# attempt to read the data to parquet from duckdb
# with exception handling to read mixed-type data
# using sqlite3 and special utility function
try:
# isolate using new connection to read data with chunk size + offset
# and export directly to parquet via duckdb (avoiding need to return data to python)
_duckdb_reader().execute(
f"""
COPY (
{base_query}
LIMIT {chunk_size} OFFSET {offset}
) TO '{result_filepath}'
(FORMAT PARQUET);
"""
)
except duckdb.Error as e:
falquaddoomi marked this conversation as resolved.
Show resolved Hide resolved
# if we see a mismatched type error
# run a more nuanced query through sqlite
# to handle the mixed types
if (
"Mismatch Type Error" in str(e)
and str(AnyPath(source["source_path"]).suffix).lower() == ".sqlite"
):
result_filepath = _sqlite_mixed_type_query_to_parquet(
source_path=str(source["source_path"]),
table_name=str(source["table_name"]),
chunk_size=chunk_size,
offset=offset,
result_filepath=result_filepath,
)

# return the filepath for the chunked output file
return result_filepath
Expand Down Expand Up @@ -192,13 +212,13 @@ def _prepend_column_name(
Column names which are used as ID's and as a result need to be
treated differently when renaming.
metadata: Union[List[str], Tuple[str, ...]]:
List of source data names which are used as metadata
List of source data names which are used as metadata.
compartments: List[str]:
List of source data names which are used as compartments
List of source data names which are used as compartments.

Returns:
str
Path to the modified file
Path to the modified file.
"""

import pathlib
Expand Down Expand Up @@ -444,15 +464,15 @@ def _get_join_chunks(
sources: Dict[List[Dict[str, Any]]]:
Grouped datasets of files which will be used by other functions.
metadata: Union[List[str], Tuple[str, ...]]:
List of source data names which are used as metadata
List of source data names which are used as metadata.
chunk_columns: Union[List[str], Tuple[str, ...]]:
Column names which appear in all compartments to use when performing join
Column names which appear in all compartments to use when performing join.
chunk_size: int:
Size of join chunks which is used to limit data size during join ops
Size of join chunks which is used to limit data size during join ops.

Returns:
List[List[Dict[str, Any]]]]:
A list of lists with at most chunk size length that contain join keys
A list of lists with at most chunk size length that contain join keys.
"""

import pathlib
Expand Down Expand Up @@ -809,13 +829,13 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
concat: bool:
Whether to concatenate similar files together.
join: bool:
Whether to join the compartment data together into one dataset
Whether to join the compartment data together into one dataset.
joins: str:
DuckDB-compatible SQL which will be used to perform the join operations.
chunk_columns: Optional[Union[List[str], Tuple[str, ...]]],
Column names which appear in all compartments to use when performing join
Column names which appear in all compartments to use when performing join.
chunk_size: Optional[int],
Size of join chunks which is used to limit data size during join ops
Size of join chunks which is used to limit data size during join ops.
infer_common_schema: bool: (Default value = True)
Whether to infer a common schema when concatenating sources.
drop_null: bool:
Expand Down
8 changes: 4 additions & 4 deletions cytotable/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@
class CytoTableException(Exception):
"""
Root exception for custom hierarchy of exceptions
with CytoTable
with CytoTable.
"""


class NoInputDataException(CytoTableException):
"""
Exception for no input data
Exception for no input data.
"""


class DatatypeException(CytoTableException):
"""
Exception for datatype challenges
Exception for datatype challenges.
"""


class SchemaException(CytoTableException):
"""
Exception for schema challenges
Exception for schema challenges.
"""
6 changes: 3 additions & 3 deletions cytotable/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def _build_path(
Path to seek filepaths within.
**kwargs: Any
keyword arguments to be used with
Cloudpathlib.CloudPath.client
Cloudpathlib.CloudPath.client .

Returns:
Union[pathlib.Path, Any]
Expand Down Expand Up @@ -207,7 +207,7 @@ def _filter_source_filepaths(
sources: Dict[str, List[Dict[str, Any]]], source_datatype: str
) -> Dict[str, List[Dict[str, Any]]]:
"""
Filter source filepaths based on provided source_datatype
Filter source filepaths based on provided source_datatype.

Args:
sources: Dict[str, List[Dict[str, Any]]]
Expand Down Expand Up @@ -245,7 +245,7 @@ def _gather_sources(
**kwargs,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Flow for gathering data sources for conversion
Flow for gathering data sources for conversion.

Args:
source_path: str:
Expand Down
89 changes: 88 additions & 1 deletion cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def Parsl_AppBase_init_for_docs(self, func, *args, **kwargs):

def _default_parsl_config():
"""
Return a default Parsl configuration for use with CytoTable
Return a default Parsl configuration for use with CytoTable.
"""
return Config(
executors=[ThreadPoolExecutor(max_threads=MAX_THREADS, label="local_threads")]
Expand Down Expand Up @@ -149,6 +149,93 @@ def _duckdb_reader() -> duckdb.DuckDBPyConnection:
)


def _sqlite_mixed_type_query_to_parquet(
source_path: str,
table_name: str,
chunk_size: int,
offset: int,
result_filepath: str,
) -> str:
"""
Performs SQLite table data extraction where one or many
columns include data values of potentially mismatched type
such that the data may be exported to Arrow and a Parquet file.

Args:
source_path: str:
A str which is a path to a SQLite database file.
table_name: str:
The name of the table being queried.
chunk_size: int:
Row count to use for chunked output.
offset: int:
The offset for chunking the data from source.
dest_path: str:
Path to store the output data.

Returns:
str:
The resulting filepath for the table exported to parquet.
"""
import sqlite3

import pyarrow as pa
import pyarrow.parquet as parquet

# open sqlite3 connection
with sqlite3.connect(source_path) as conn:
cursor = conn.cursor()

# gather table column details including datatype
cursor.execute(
f"""
SELECT :table_name as table_name,
name as column_name,
type as column_type
FROM pragma_table_info(:table_name);
""",
{"table_name": table_name},
)

# gather column metadata details as list of dictionaries
column_info = [
dict(zip([desc[0] for desc in cursor.description], row))
for row in cursor.fetchall()
]
falquaddoomi marked this conversation as resolved.
Show resolved Hide resolved

# create cases for mixed-type handling in each column discovered above
query_parts = [
f"""
CASE
/* when the storage class type doesn't match the column, return nulltype */
WHEN typeof({col['column_name']}) != '{col['column_type'].lower()}' THEN NULL
/* else, return the normal value */
ELSE {col['column_name']}
END AS {col['column_name']}
"""
for col in column_info
]

# perform the select using the cases built above and using chunksize + offset
cursor.execute(
f'SELECT {", ".join(query_parts)} FROM {table_name} LIMIT {chunk_size} OFFSET {offset};'
)
# collect the results and include the column name with values
results = [
dict(zip([desc[0] for desc in cursor.description], row))
for row in cursor.fetchall()
]

# write results to a parquet file
parquet.write_table(
table=pa.Table.from_pylist(results),
where=result_filepath,
)

# return filepath
return result_filepath


def _cache_cloudpath_to_local(path: Union[str, AnyPath]) -> pathlib.Path:
"""
Takes a cloudpath and uses cache to convert to a local copy
Expand Down
57 changes: 57 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import pathlib
import shutil
import sqlite3
import subprocess
import tempfile
from typing import Any, Dict, Generator, List, Tuple
Expand Down Expand Up @@ -488,3 +489,59 @@ def example_s3_endpoint(

# return endpoint url for use in testing
return endpoint_url


@pytest.fixture()
def example_sqlite_mixed_types_database(
get_tempdir: str,
) -> Generator:
"""
Creates a database which includes mixed type columns
for testing specific functionality within CytoTable
"""

# create a temporary sqlite connection
filepath = f"{get_tempdir}/example_mixed_types.sqlite"

# statements for creating database with simple structure
create_stmts = [
"DROP TABLE IF EXISTS tbl_a;",
"""
CREATE TABLE tbl_a (
col_integer INTEGER NOT NULL
,col_text TEXT
,col_blob BLOB
,col_real REAL
);
""",
]

# some example values to insert into the database
insert_vals = [1, "sample", b"sample_blob", 0.5]
err_values = ["nan", "sample", b"another_blob", "nan"]

# create the database and insert some data into it
with sqlite3.connect(filepath) as connection:
for stmt in create_stmts:
connection.execute(stmt)

connection.execute(
(
"INSERT INTO tbl_a (col_integer, col_text, col_blob, col_real)"
"VALUES (?, ?, ?, ?);"
),
insert_vals,
)
connection.execute(
(
"INSERT INTO tbl_a (col_integer, col_text, col_blob, col_real)"
"VALUES (?, ?, ?, ?);"
),
err_values,
)

try:
yield filepath
finally:
# after completing the tests, remove the file
pathlib.Path(filepath).unlink()
Loading