diff --git a/cytotable/convert.py b/cytotable/convert.py index b0dc5428..96d6f4e6 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -33,7 +33,7 @@ def _get_table_chunk_offsets( Contains the source data to be chunked. Represents a single file or table of some kind. chunk_size: int - The size in rowcount of the chunks to create + The size in rowcount of the chunks to create. Returns: List[int] @@ -110,12 +110,12 @@ def _source_chunk_to_parquet( Args: source_group_name: str - Name of the source group (for ex. compartment or metadata table name) + Name of the source group (for ex. compartment or metadata table name). source: Dict[str, Any] Contains the source data to be chunked. Represents a single file or table of some kind along with collected information about table. chunk_size: int - Row count to use for chunked output + Row count to use for chunked output. offset: int The offset for chunking the data from source. dest_path: str @@ -123,14 +123,15 @@ def _source_chunk_to_parquet( Returns: str - A string of the output filepath + A string of the output filepath. """ import pathlib + import duckdb from cloudpathlib import AnyPath - from cytotable.utils import _duckdb_reader + from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet # attempt to build dest_path source_dest_path = ( @@ -152,17 +153,36 @@ def _source_chunk_to_parquet( result_filepath = f"{result_filepath_base}-{offset}.parquet" - # isolate using new connection to read data with chunk size + offset - # and export directly to parquet via duckdb (avoiding need to return data to python) - _duckdb_reader().execute( - f""" - COPY ( - {base_query} - LIMIT {chunk_size} OFFSET {offset} - ) TO '{result_filepath}' - (FORMAT PARQUET); - """ - ) + # attempt to read the data to parquet from duckdb + # with exception handling to read mixed-type data + # using sqlite3 and special utility function + try: + # isolate using new connection to read data with chunk size + offset + # and export directly to parquet via duckdb (avoiding need to return data to python) + _duckdb_reader().execute( + f""" + COPY ( + {base_query} + LIMIT {chunk_size} OFFSET {offset} + ) TO '{result_filepath}' + (FORMAT PARQUET); + """ + ) + except duckdb.Error as e: + # if we see a mismatched type error + # run a more nuanced query through sqlite + # to handle the mixed types + if ( + "Mismatch Type Error" in str(e) + and str(AnyPath(source["source_path"]).suffix).lower() == ".sqlite" + ): + result_filepath = _sqlite_mixed_type_query_to_parquet( + source_path=str(source["source_path"]), + table_name=str(source["table_name"]), + chunk_size=chunk_size, + offset=offset, + result_filepath=result_filepath, + ) # return the filepath for the chunked output file return result_filepath @@ -192,13 +212,13 @@ def _prepend_column_name( Column names which are used as ID's and as a result need to be treated differently when renaming. metadata: Union[List[str], Tuple[str, ...]]: - List of source data names which are used as metadata + List of source data names which are used as metadata. compartments: List[str]: - List of source data names which are used as compartments + List of source data names which are used as compartments. Returns: str - Path to the modified file + Path to the modified file. """ import pathlib @@ -444,15 +464,15 @@ def _get_join_chunks( sources: Dict[List[Dict[str, Any]]]: Grouped datasets of files which will be used by other functions. metadata: Union[List[str], Tuple[str, ...]]: - List of source data names which are used as metadata + List of source data names which are used as metadata. chunk_columns: Union[List[str], Tuple[str, ...]]: - Column names which appear in all compartments to use when performing join + Column names which appear in all compartments to use when performing join. chunk_size: int: - Size of join chunks which is used to limit data size during join ops + Size of join chunks which is used to limit data size during join ops. Returns: List[List[Dict[str, Any]]]]: - A list of lists with at most chunk size length that contain join keys + A list of lists with at most chunk size length that contain join keys. """ import pathlib @@ -809,13 +829,13 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals concat: bool: Whether to concatenate similar files together. join: bool: - Whether to join the compartment data together into one dataset + Whether to join the compartment data together into one dataset. joins: str: DuckDB-compatible SQL which will be used to perform the join operations. chunk_columns: Optional[Union[List[str], Tuple[str, ...]]], - Column names which appear in all compartments to use when performing join + Column names which appear in all compartments to use when performing join. chunk_size: Optional[int], - Size of join chunks which is used to limit data size during join ops + Size of join chunks which is used to limit data size during join ops. infer_common_schema: bool: (Default value = True) Whether to infer a common schema when concatenating sources. drop_null: bool: diff --git a/cytotable/exceptions.py b/cytotable/exceptions.py index bd623add..ec6bb37d 100644 --- a/cytotable/exceptions.py +++ b/cytotable/exceptions.py @@ -6,23 +6,23 @@ class CytoTableException(Exception): """ Root exception for custom hierarchy of exceptions - with CytoTable + with CytoTable. """ class NoInputDataException(CytoTableException): """ - Exception for no input data + Exception for no input data. """ class DatatypeException(CytoTableException): """ - Exception for datatype challenges + Exception for datatype challenges. """ class SchemaException(CytoTableException): """ - Exception for schema challenges + Exception for schema challenges. """ diff --git a/cytotable/sources.py b/cytotable/sources.py index c5301f89..04d16ee3 100644 --- a/cytotable/sources.py +++ b/cytotable/sources.py @@ -22,7 +22,7 @@ def _build_path( Path to seek filepaths within. **kwargs: Any keyword arguments to be used with - Cloudpathlib.CloudPath.client + Cloudpathlib.CloudPath.client . Returns: Union[pathlib.Path, Any] @@ -207,7 +207,7 @@ def _filter_source_filepaths( sources: Dict[str, List[Dict[str, Any]]], source_datatype: str ) -> Dict[str, List[Dict[str, Any]]]: """ - Filter source filepaths based on provided source_datatype + Filter source filepaths based on provided source_datatype. Args: sources: Dict[str, List[Dict[str, Any]]] @@ -245,7 +245,7 @@ def _gather_sources( **kwargs, ) -> Dict[str, List[Dict[str, Any]]]: """ - Flow for gathering data sources for conversion + Flow for gathering data sources for conversion. Args: source_path: str: diff --git a/cytotable/utils.py b/cytotable/utils.py index 925daf27..0c95a713 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -47,7 +47,7 @@ def Parsl_AppBase_init_for_docs(self, func, *args, **kwargs): def _default_parsl_config(): """ - Return a default Parsl configuration for use with CytoTable + Return a default Parsl configuration for use with CytoTable. """ return Config( executors=[ThreadPoolExecutor(max_threads=MAX_THREADS, label="local_threads")] @@ -149,6 +149,93 @@ def _duckdb_reader() -> duckdb.DuckDBPyConnection: ) +def _sqlite_mixed_type_query_to_parquet( + source_path: str, + table_name: str, + chunk_size: int, + offset: int, + result_filepath: str, +) -> str: + """ + Performs SQLite table data extraction where one or many + columns include data values of potentially mismatched type + such that the data may be exported to Arrow and a Parquet file. + + Args: + source_path: str: + A str which is a path to a SQLite database file. + table_name: str: + The name of the table being queried. + chunk_size: int: + Row count to use for chunked output. + offset: int: + The offset for chunking the data from source. + dest_path: str: + Path to store the output data. + + Returns: + str: + The resulting filepath for the table exported to parquet. + """ + import sqlite3 + + import pyarrow as pa + import pyarrow.parquet as parquet + + # open sqlite3 connection + with sqlite3.connect(source_path) as conn: + cursor = conn.cursor() + + # gather table column details including datatype + cursor.execute( + f""" + SELECT :table_name as table_name, + name as column_name, + type as column_type + FROM pragma_table_info(:table_name); + """, + {"table_name": table_name}, + ) + + # gather column metadata details as list of dictionaries + column_info = [ + dict(zip([desc[0] for desc in cursor.description], row)) + for row in cursor.fetchall() + ] + + # create cases for mixed-type handling in each column discovered above + query_parts = [ + f""" + CASE + /* when the storage class type doesn't match the column, return nulltype */ + WHEN typeof({col['column_name']}) != '{col['column_type'].lower()}' THEN NULL + /* else, return the normal value */ + ELSE {col['column_name']} + END AS {col['column_name']} + """ + for col in column_info + ] + + # perform the select using the cases built above and using chunksize + offset + cursor.execute( + f'SELECT {", ".join(query_parts)} FROM {table_name} LIMIT {chunk_size} OFFSET {offset};' + ) + # collect the results and include the column name with values + results = [ + dict(zip([desc[0] for desc in cursor.description], row)) + for row in cursor.fetchall() + ] + + # write results to a parquet file + parquet.write_table( + table=pa.Table.from_pylist(results), + where=result_filepath, + ) + + # return filepath + return result_filepath + + def _cache_cloudpath_to_local(path: Union[str, AnyPath]) -> pathlib.Path: """ Takes a cloudpath and uses cache to convert to a local copy diff --git a/tests/conftest.py b/tests/conftest.py index 769e9153..a6e5c735 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,6 +3,7 @@ """ import pathlib import shutil +import sqlite3 import subprocess import tempfile from typing import Any, Dict, Generator, List, Tuple @@ -488,3 +489,59 @@ def example_s3_endpoint( # return endpoint url for use in testing return endpoint_url + + +@pytest.fixture() +def example_sqlite_mixed_types_database( + get_tempdir: str, +) -> Generator: + """ + Creates a database which includes mixed type columns + for testing specific functionality within CytoTable + """ + + # create a temporary sqlite connection + filepath = f"{get_tempdir}/example_mixed_types.sqlite" + + # statements for creating database with simple structure + create_stmts = [ + "DROP TABLE IF EXISTS tbl_a;", + """ + CREATE TABLE tbl_a ( + col_integer INTEGER NOT NULL + ,col_text TEXT + ,col_blob BLOB + ,col_real REAL + ); + """, + ] + + # some example values to insert into the database + insert_vals = [1, "sample", b"sample_blob", 0.5] + err_values = ["nan", "sample", b"another_blob", "nan"] + + # create the database and insert some data into it + with sqlite3.connect(filepath) as connection: + for stmt in create_stmts: + connection.execute(stmt) + + connection.execute( + ( + "INSERT INTO tbl_a (col_integer, col_text, col_blob, col_real)" + "VALUES (?, ?, ?, ?);" + ), + insert_vals, + ) + connection.execute( + ( + "INSERT INTO tbl_a (col_integer, col_text, col_blob, col_real)" + "VALUES (?, ?, ?, ?);" + ), + err_values, + ) + + try: + yield filepath + finally: + # after completing the tests, remove the file + pathlib.Path(filepath).unlink() diff --git a/tests/test_convert.py b/tests/test_convert.py index 265463ce..9ea08064 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -9,6 +9,7 @@ from shutil import copy from typing import Any, Dict, List, Tuple, cast +import duckdb import parsl import pyarrow as pa import pytest @@ -31,7 +32,11 @@ ) from cytotable.presets import config from cytotable.sources import _get_source_filepaths, _infer_source_datatype -from cytotable.utils import _column_sort, _duckdb_reader +from cytotable.utils import ( + _column_sort, + _duckdb_reader, + _sqlite_mixed_type_query_to_parquet, +) def test_config(): @@ -835,6 +840,62 @@ def test_convert_cellprofiler_sqlite_pycytominer_merge( assert pycytominer_table.shape == cytotable_table.shape +def test_sqlite_mixed_type_query_to_parquet( + get_tempdir: str, example_sqlite_mixed_types_database: str +): + """ + Testing _sqlite_mixed_type_query_to_parquet + """ + + result_filepath = f"{get_tempdir}/example_mixed_types_tbl_a.parquet" + table_name = "tbl_a" + + try: + # attempt to read the data using DuckDB + result = _duckdb_reader().execute( + f"""COPY ( + select * from sqlite_scan('{example_sqlite_mixed_types_database}','{table_name}') + LIMIT 2 OFFSET 0 + ) TO '{result_filepath}' + (FORMAT PARQUET) + """ + ) + except duckdb.Error as duckdb_exc: + # if we see a mismatched type error + # run a more nuanced query through sqlite + # to handle the mixed types + if "Mismatch Type Error" in str(duckdb_exc): + result = _sqlite_mixed_type_query_to_parquet( + source_path=example_sqlite_mixed_types_database, + table_name=table_name, + chunk_size=2, + offset=0, + result_filepath=result_filepath, + ) + + # check schema names + assert parquet.read_schema(where=result).names == [ + "col_integer", + "col_text", + "col_blob", + "col_real", + ] + # check schema types + assert parquet.read_schema(where=result).types == [ + pa.int64(), + pa.string(), + pa.binary(), + pa.float64(), + ] + # check the values per column + assert parquet.read_table(source=result).to_pydict() == { + "col_integer": [1, None], + "col_text": ["sample", "sample"], + "col_blob": [b"sample_blob", b"another_blob"], + "col_real": [0.5, None], + } + + def test_convert_hte_cellprofiler_csv( get_tempdir: str, data_dir_cellprofiler: str,