Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQLite mixed-type compatibility for _get_table_columns_and_types #82

Merged
merged 4 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 49 additions & 11 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]]

import pathlib

from cytotable.utils import _duckdb_reader
import duckdb

from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet

source_path = source["source_path"]
source_type = str(pathlib.Path(source_path).suffix).lower()
Expand All @@ -49,12 +51,12 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]]

# query top 5 results from table and use pragma_storage_info() to
# gather duckdb interpreted data typing
select_query = f"""
select_query = """
/* we create an in-mem table for later use with the pragma_storage_info call
as this call only functions with materialized tables and not views or related */
CREATE TABLE column_details AS
(SELECT *
FROM {select_source}
FROM &select_source
LIMIT 5
);

Expand All @@ -69,8 +71,39 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]]
WHERE segment_type != 'VALIDITY';
"""

# perform the query and create a list of dictionaries with the column data for table
return _duckdb_reader().execute(select_query).arrow().to_pylist()
# attempt to read the data to parquet from duckdb
# with exception handling to read mixed-type data
# using sqlite3 and special utility function
try:
# isolate using new connection to read data with chunk size + offset
# and export directly to parquet via duckdb (avoiding need to return data to python)
# perform the query and create a list of dictionaries with the column data for table
return (
_duckdb_reader()
.execute(select_query.replace("&select_source", select_source))
.arrow()
.to_pylist()
)

except duckdb.Error as e:
# if we see a mismatched type error
# run a more nuanced query through sqlite
# to handle the mixed types
if "Mismatch Type Error" in str(e) and source_type == ".sqlite":
arrow_data_tbl = _sqlite_mixed_type_query_to_parquet(
source_path=str(source["source_path"]),
table_name=str(source["table_name"]),
chunk_size=5,
d33bs marked this conversation as resolved.
Show resolved Hide resolved
offset=0,
)
return (
_duckdb_reader()
.execute(select_query.replace("&select_source", "arrow_data_tbl"))
.arrow()
.to_pylist()
)
else:
raise


@python_app
Expand Down Expand Up @@ -238,6 +271,7 @@ def _source_chunk_to_parquet(

import duckdb
from cloudpathlib import AnyPath
from pyarrow import parquet

from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet

Expand Down Expand Up @@ -292,13 +326,17 @@ def _source_chunk_to_parquet(
"Mismatch Type Error" in str(e)
and str(AnyPath(source["source_path"]).suffix).lower() == ".sqlite"
):
result_filepath = _sqlite_mixed_type_query_to_parquet(
source_path=str(source["source_path"]),
table_name=str(source["table_name"]),
chunk_size=chunk_size,
offset=offset,
result_filepath=result_filepath,
parquet.write_table(
table=_sqlite_mixed_type_query_to_parquet(
source_path=str(source["source_path"]),
table_name=str(source["table_name"]),
chunk_size=chunk_size,
offset=offset,
),
where=result_filepath,
)
else:
raise

# return the filepath for the chunked output file
return result_filepath
Expand Down
20 changes: 5 additions & 15 deletions cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,11 @@ def _sqlite_mixed_type_query_to_parquet(
table_name: str,
chunk_size: int,
offset: int,
result_filepath: str,
) -> str:
"""
Performs SQLite table data extraction where one or many
columns include data values of potentially mismatched type
such that the data may be exported to Arrow and a Parquet file.
such that the data may be exported to Arrow for later use.
Args:
source_path: str:
Expand All @@ -178,17 +177,14 @@ def _sqlite_mixed_type_query_to_parquet(
Row count to use for chunked output.
offset: int:
The offset for chunking the data from source.
dest_path: str:
Path to store the output data.
Returns:
str:
The resulting filepath for the table exported to parquet.
pyarrow.Table:
The resulting arrow table for the data
"""
import sqlite3

import pyarrow as pa
import pyarrow.parquet as parquet

# open sqlite3 connection
with sqlite3.connect(source_path) as conn:
Expand Down Expand Up @@ -234,14 +230,8 @@ def _sqlite_mixed_type_query_to_parquet(
for row in cursor.fetchall()
]

# write results to a parquet file
parquet.write_table(
table=pa.Table.from_pylist(results),
where=result_filepath,
)

# return filepath
return result_filepath
# return arrow table with results
return pa.Table.from_pylist(results)


def _cache_cloudpath_to_local(path: Union[str, AnyPath]) -> pathlib.Path:
Expand Down
40 changes: 31 additions & 9 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -970,36 +970,58 @@ def test_sqlite_mixed_type_query_to_parquet(
# run a more nuanced query through sqlite
# to handle the mixed types
if "Mismatch Type Error" in str(duckdb_exc):
result = _sqlite_mixed_type_query_to_parquet(
source_path=example_sqlite_mixed_types_database,
table_name=table_name,
chunk_size=2,
offset=0,
result_filepath=result_filepath,
parquet.write_table(
table=_sqlite_mixed_type_query_to_parquet(
source_path=example_sqlite_mixed_types_database,
table_name=table_name,
chunk_size=2,
offset=0,
),
where=result_filepath,
)

# check schema names
assert parquet.read_schema(where=result).names == [
assert parquet.read_schema(where=result_filepath).names == [
"col_integer",
"col_text",
"col_blob",
"col_real",
]
# check schema types
assert parquet.read_schema(where=result).types == [
assert parquet.read_schema(where=result_filepath).types == [
pa.int64(),
pa.string(),
pa.binary(),
pa.float64(),
]
# check the values per column
assert parquet.read_table(source=result).to_pydict() == {
assert parquet.read_table(source=result_filepath).to_pydict() == {
"col_integer": [1, None],
"col_text": ["sample", "sample"],
"col_blob": [b"sample_blob", b"another_blob"],
"col_real": [0.5, None],
}

# run full convert on mixed type database
result = convert(
source_path=example_sqlite_mixed_types_database,
dest_path=result_filepath,
dest_datatype="parquet",
source_datatype="sqlite",
compartments=[table_name],
join=False,
)

# assert that the single table result looks like the following dictionary
assert parquet.read_table(
source=result["Tbl_a.sqlite"][0]["table"][0]
).to_pydict() == {
"Tbl_a_col_integer": [1, None],
"Tbl_a_col_text": ["sample", "sample"],
"Tbl_a_col_blob": [b"sample_blob", b"another_blob"],
"Tbl_a_col_real": [0.5, None],
}


def test_convert_hte_cellprofiler_csv(
get_tempdir: str,
Expand Down