diff --git a/pycytominer/cyto_utils/cells.py b/pycytominer/cyto_utils/cells.py index 969f44e1..b5172019 100644 --- a/pycytominer/cyto_utils/cells.py +++ b/pycytominer/cyto_utils/cells.py @@ -419,13 +419,16 @@ def get_sql_table_col_names(self, table): return meta_cols, feat_cols - def load_compartment(self, compartment): + def load_compartment(self, compartment, float_datatype: type = np.float64): """Creates the compartment dataframe. Parameters ---------- compartment : str The compartment to process. + float_datatype: type, default np.float64 + Numpy floating point datatype to use for load_compartment and resulting dataframes. + Please note: using any besides np.float64 are experimentally unverified. Returns ------- @@ -439,7 +442,7 @@ def load_compartment(self, compartment): num_meta, num_feats = len(meta_cols), len(feat_cols) # Use pre-allocated np.array for data - feats = np.empty(shape=(num_cells, num_feats), dtype=np.float64) + feats = np.empty(shape=(num_cells, num_feats), dtype=float_datatype) # Use pre-allocated pd.DataFrame for metadata metas = pd.DataFrame(columns=meta_cols, index=range(num_cells)) @@ -652,6 +655,8 @@ def merge_single_cells( single_cell_normalize: bool = False, normalize_args: Optional[Dict] = None, platemap: Optional[Union[str, pd.DataFrame]] = None, + sc_merge_chunksize: Optional[int] = None, + float_datatype: type = np.float64, **kwargs, ): """Given the linking columns, merge single cell data. Normalization is also supported. @@ -672,6 +677,13 @@ def merge_single_cells( Additional arguments passed as input to pycytominer.normalize(). platemap: str or pd.DataFrame, default None optional platemap filepath str or pd.DataFrame to be used with results via annotate + sc_merge_chunksize: int, default None + Chunksize for merge and concatenation operations to help address performance issues + note: if set to None, will infer a chunksize which is the roughly 1/3 the row length + of first component df. + float_datatype: type, default np.float64 + Numpy floating point datatype to use for load_compartment and resulting dataframes. + Please note: using any besides np.float64 are experimentally unverified. Returns ------- @@ -681,7 +693,7 @@ def merge_single_cells( """ # Load the single cell dataframe by merging on the specific linking columns - sc_df = "" + sc_df = pd.DataFrame() linking_check_cols = [] merge_suffix_rename = [] for left_compartment in self.compartment_linking_cols: @@ -704,8 +716,15 @@ def merge_single_cells( left_compartment ] - if isinstance(sc_df, str): - sc_df = self.load_compartment(compartment=left_compartment) + if sc_df.empty: + sc_df = self.load_compartment( + compartment=left_compartment, float_datatype=float_datatype + ) + + # if chunksize was not set, set it to roughly + # one third the size of our initial compartment + if sc_merge_chunksize is None: + sc_merge_chunksize = round(len(sc_df) / 3) if compute_subsample: # Sample cells proportionally by self.strata @@ -719,20 +738,27 @@ def merge_single_cells( sc_df, how="left", on=subset_logic_df.columns.tolist() ).reindex(sc_df.columns, axis="columns") - sc_df = sc_df.merge( - self.load_compartment(compartment=right_compartment), - left_on=self.merge_cols + [left_link_col], - right_on=self.merge_cols + [right_link_col], - suffixes=merge_suffix, - ) - - else: - sc_df = sc_df.merge( - self.load_compartment(compartment=right_compartment), - left_on=self.merge_cols + [left_link_col], - right_on=self.merge_cols + [right_link_col], - suffixes=merge_suffix, - ) + # perform a segmented merge using pd.concat and + # sc_merge_chunksize to help constrain memory + sc_df = pd.concat( + [ + self.load_compartment( + compartment=right_compartment, float_datatype=float_datatype + ).merge( + right=right_chunk, + # note: we reverse left and right for join key merge order reference + left_on=self.merge_cols + [right_link_col], + right_on=self.merge_cols + [left_link_col], + # note: we reverse left and right for join keys + suffixes=reversed(merge_suffix), + how="inner", + ) + for right_chunk in [ + sc_df[i : i + sc_merge_chunksize] + for i in range(0, sc_df.shape[0], sc_merge_chunksize) + ] + ] + ) linking_check_cols.append(linking_check) @@ -759,8 +785,20 @@ def merge_single_cells( self.load_image() self.load_image_data = True + # perform a segmented merge using pd.concat and + # sc_merge_chunksize to help constrain memory sc_df = ( - self.image_df.merge(sc_df, on=self.merge_cols, how="right") + pd.concat( + [ + self.image_df.merge( + right=right_chunk, on=self.merge_cols, how="right" + ) + for right_chunk in [ + sc_df[i : i + sc_merge_chunksize] + for i in range(0, sc_df.shape[0], sc_merge_chunksize) + ] + ] + ) # pandas rename performance may be improved using copy=False, inplace=False # reference: https://ryanlstevens.github.io/2022-05-06-pandasColumnRenaming/ .rename( @@ -769,6 +807,10 @@ def merge_single_cells( self.full_merge_suffix_rename, axis="columns", copy=False, inplace=False ) ) + + # reset the index to address above concat merges and memory conservation (inplace) + sc_df.reset_index(inplace=True, drop=True) + if single_cell_normalize: # Infering features is tricky with non-canonical data if normalize_args is None: diff --git a/pycytominer/tests/test_cyto_utils/test_cells.py b/pycytominer/tests/test_cyto_utils/test_cells.py index db347d7e..e766a202 100644 --- a/pycytominer/tests/test_cyto_utils/test_cells.py +++ b/pycytominer/tests/test_cyto_utils/test_cells.py @@ -3,6 +3,7 @@ import random import tempfile +import numpy as np import pandas as pd import pytest from pycytominer import aggregate, annotate, normalize @@ -255,6 +256,23 @@ def test_load_compartment(): check_dtype=False, ) + # test using non-default float_datatype + loaded_compartment_df = AP.load_compartment( + compartment="cells", float_datatype=np.float32 + ) + pd.testing.assert_frame_equal( + loaded_compartment_df, + CELLS_DF.astype( + # cast any float type columns to float32 for expected comparison + { + colname: np.float32 + for colname in CELLS_DF.columns + if pd.api.types.is_float(CELLS_DF[colname].dtype) + } + ).reindex(columns=loaded_compartment_df.columns), + check_dtype=False, + ) + def test_sc_count_sql_table(): # Iterate over initialized compartments @@ -273,6 +291,10 @@ def test_get_sql_table_col_names(): def test_merge_single_cells(): + """ + Testing various SingleCells.merge_single_cells functionality + """ + sc_merged_df = AP.merge_single_cells() # Assert that the image data was merged @@ -300,21 +322,43 @@ def test_merge_single_cells(): ) # Confirm that the merge correctly reversed the object number (opposite from Parent) - assert ( - sc_merged_df.Metadata_ObjectNumber_cytoplasm.tolist()[::-1] - == sc_merged_df.Metadata_ObjectNumber.tolist() - ) - assert ( - manual_merge.Metadata_ObjectNumber_cytoplasm.tolist()[::-1] - == sc_merged_df.Metadata_ObjectNumber.tolist() - ) - assert ( - manual_merge.Metadata_ObjectNumber_cytoplasm.tolist()[::-1] - == sc_merged_df.Metadata_ObjectNumber.tolist() + assert_cols = [ + "Metadata_ObjectNumber", + "Metadata_ObjectNumber_cytoplasm", + "Metadata_ObjectNumber_cells", + ] + + # check that we have the same data using same cols, sort and a reset index + pd.testing.assert_frame_equal( + left=manual_merge[assert_cols] + .sort_values(by=assert_cols, ascending=True) + .reset_index(drop=True), + right=sc_merged_df[assert_cols] + .sort_values(by=assert_cols, ascending=True) + .reset_index(drop=True), + check_dtype=False, ) - assert ( - manual_merge.Metadata_ObjectNumber_cells.tolist() - == sc_merged_df.Metadata_ObjectNumber.tolist() + + # use non-default float_datatype + sc_merged_df = AP.merge_single_cells(float_datatype=np.float32) + + # similar to the assert above, we test non-default float dtype specification + pd.testing.assert_frame_equal( + left=manual_merge[assert_cols] + .astype( + # cast any float type columns to float32 for expected comparison + { + colname: np.float32 + for colname in manual_merge.columns + if pd.api.types.is_float(manual_merge[colname].dtype) + } + ) + .sort_values(by=assert_cols, ascending=True) + .reset_index(drop=True), + right=sc_merged_df[assert_cols] + .sort_values(by=assert_cols, ascending=True) + .reset_index(drop=True), + check_dtype=False, ) # Confirm the merge and adding merge options @@ -335,9 +379,14 @@ def test_merge_single_cells(): manual_merge, method=method, samples=samples, features=features ) + # compare data using identical column order, sorting, and reset index pd.testing.assert_frame_equal( - norm_method_df.sort_index(axis=1), - manual_merge_normalize.sort_index(axis=1), + norm_method_df[norm_method_df.columns] + .sort_values(by="Cells_a") + .reset_index(drop=True), + manual_merge_normalize[norm_method_df.columns] + .sort_values(by="Cells_a") + .reset_index(drop=True), check_dtype=False, ) @@ -345,9 +394,26 @@ def test_merge_single_cells(): new_sc_merge_df = AP_NEW.merge_single_cells() assert sum(new_sc_merge_df.columns.str.startswith("New")) == 4 - assert ( - NEW_COMPARTMENT_DF.ObjectNumber.tolist()[::-1] - == new_sc_merge_df.Metadata_ObjectNumber_new.tolist() + + assert_cols = [ + "New_a", + "New_b", + "New_c", + "New_d", + "Metadata_ObjectNumber_new", + ] + # compare data using identical column order, sorting, and reset index + # note: we rename NEW_COMPARTMENT_DF to match new_sc_merge_df's ObjectNumber colname + pd.testing.assert_frame_equal( + left=NEW_COMPARTMENT_DF.rename( + columns={"ObjectNumber": "Metadata_ObjectNumber_new"} + )[assert_cols] + .sort_values(by=assert_cols) + .reset_index(drop=True), + right=new_sc_merge_df[assert_cols] + .sort_values(by=assert_cols) + .reset_index(drop=True), + check_dtype=False, ) norm_new_method_df = AP_NEW.merge_single_cells( @@ -471,7 +537,6 @@ def test_merge_single_cells_cytominer_database_test_file(): f"{os.path.dirname(__file__)}/../test_data/cytominer_database_example_data/test_SQ00014613.parquet", ) sql_url = f"sqlite:///{sql_path}" - print(sql_url) # build SingleCells from database sc_p = SingleCells( @@ -493,8 +558,8 @@ def test_merge_single_cells_cytominer_database_test_file(): # note: pd.DataFrame datatypes sometimes appear automatically changed on-read, so we cast # the result_file dataframe using the base dataframe's types. pd.testing.assert_frame_equal( - pd.read_csv(csv_path).astype(merged_sc.dtypes.to_dict()), - pd.read_csv(result_file).astype(merged_sc.dtypes.to_dict()), + pd.read_csv(csv_path).astype(merged_sc.dtypes.to_dict())[merged_sc.columns], + pd.read_csv(result_file).astype(merged_sc.dtypes.to_dict())[merged_sc.columns], ) # test parquet output from merge_single_cells @@ -507,8 +572,12 @@ def test_merge_single_cells_cytominer_database_test_file(): # note: pd.DataFrame datatypes sometimes appear automatically changed on-read, so we cast # the result_file dataframe using the base dataframe's types. pd.testing.assert_frame_equal( - pd.read_parquet(parquet_path).astype(merged_sc.dtypes.to_dict()), - pd.read_parquet(result_file).astype(merged_sc.dtypes.to_dict()), + pd.read_parquet(parquet_path).astype(merged_sc.dtypes.to_dict())[ + merged_sc.columns + ], + pd.read_parquet(result_file).astype(merged_sc.dtypes.to_dict())[ + merged_sc.columns + ], ) # test parquet output from merge_single_cells with annotation meta