Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory consumption of SingleCells.merge_single_cells #234

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 62 additions & 20 deletions pycytominer/cyto_utils/cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,16 @@ def get_sql_table_col_names(self, table):

return meta_cols, feat_cols

def load_compartment(self, compartment):
def load_compartment(self, compartment, float_datatype: type = np.float64):
"""Creates the compartment dataframe.

Parameters
----------
compartment : str
The compartment to process.
float_datatype: type, default np.float64
Numpy floating point datatype to use for load_compartment and resulting dataframes.
Please note: using any besides np.float64 are experimentally unverified.

Returns
-------
Expand All @@ -439,7 +442,7 @@ def load_compartment(self, compartment):
num_meta, num_feats = len(meta_cols), len(feat_cols)

# Use pre-allocated np.array for data
feats = np.empty(shape=(num_cells, num_feats), dtype=np.float64)
feats = np.empty(shape=(num_cells, num_feats), dtype=float_datatype)
# Use pre-allocated pd.DataFrame for metadata
metas = pd.DataFrame(columns=meta_cols, index=range(num_cells))

Expand Down Expand Up @@ -652,6 +655,8 @@ def merge_single_cells(
single_cell_normalize: bool = False,
normalize_args: Optional[Dict] = None,
platemap: Optional[Union[str, pd.DataFrame]] = None,
sc_merge_chunksize: Optional[int] = None,
float_datatype: type = np.float64,
**kwargs,
):
"""Given the linking columns, merge single cell data. Normalization is also supported.
Expand All @@ -672,6 +677,13 @@ def merge_single_cells(
Additional arguments passed as input to pycytominer.normalize().
platemap: str or pd.DataFrame, default None
optional platemap filepath str or pd.DataFrame to be used with results via annotate
sc_merge_chunksize: int, default None
Chunksize for merge and concatenation operations to help address performance issues
note: if set to None, will infer a chunksize which is the roughly 1/3 the row length
of first component df.
float_datatype: type, default np.float64
Numpy floating point datatype to use for load_compartment and resulting dataframes.
Please note: using any besides np.float64 are experimentally unverified.

Returns
-------
Expand All @@ -681,7 +693,7 @@ def merge_single_cells(
"""

# Load the single cell dataframe by merging on the specific linking columns
sc_df = ""
sc_df = pd.DataFrame()
linking_check_cols = []
merge_suffix_rename = []
for left_compartment in self.compartment_linking_cols:
Expand All @@ -704,8 +716,15 @@ def merge_single_cells(
left_compartment
]

if isinstance(sc_df, str):
sc_df = self.load_compartment(compartment=left_compartment)
if sc_df.empty:
sc_df = self.load_compartment(
compartment=left_compartment, float_datatype=float_datatype
)

# if chunksize was not set, set it to roughly
# one third the size of our initial compartment
if sc_merge_chunksize is None:
sc_merge_chunksize = round(len(sc_df) / 3)

if compute_subsample:
# Sample cells proportionally by self.strata
Expand All @@ -719,20 +738,27 @@ def merge_single_cells(
sc_df, how="left", on=subset_logic_df.columns.tolist()
).reindex(sc_df.columns, axis="columns")

sc_df = sc_df.merge(
self.load_compartment(compartment=right_compartment),
left_on=self.merge_cols + [left_link_col],
right_on=self.merge_cols + [right_link_col],
suffixes=merge_suffix,
)

else:
sc_df = sc_df.merge(
self.load_compartment(compartment=right_compartment),
left_on=self.merge_cols + [left_link_col],
right_on=self.merge_cols + [right_link_col],
suffixes=merge_suffix,
)
# perform a segmented merge using pd.concat and
# sc_merge_chunksize to help constrain memory
sc_df = pd.concat(
[
self.load_compartment(
compartment=right_compartment, float_datatype=float_datatype
).merge(
right=right_chunk,
# note: we reverse left and right for join key merge order reference
left_on=self.merge_cols + [right_link_col],
right_on=self.merge_cols + [left_link_col],
# note: we reverse left and right for join keys
suffixes=reversed(merge_suffix),
how="inner",
)
for right_chunk in [
sc_df[i : i + sc_merge_chunksize]
for i in range(0, sc_df.shape[0], sc_merge_chunksize)
]
]
)

linking_check_cols.append(linking_check)

Expand All @@ -759,8 +785,20 @@ def merge_single_cells(
self.load_image()
self.load_image_data = True

# perform a segmented merge using pd.concat and
# sc_merge_chunksize to help constrain memory
sc_df = (
self.image_df.merge(sc_df, on=self.merge_cols, how="right")
pd.concat(
[
self.image_df.merge(
right=right_chunk, on=self.merge_cols, how="right"
)
for right_chunk in [
sc_df[i : i + sc_merge_chunksize]
for i in range(0, sc_df.shape[0], sc_merge_chunksize)
]
]
)
# pandas rename performance may be improved using copy=False, inplace=False
# reference: https://ryanlstevens.github.io/2022-05-06-pandasColumnRenaming/
.rename(
Expand All @@ -769,6 +807,10 @@ def merge_single_cells(
self.full_merge_suffix_rename, axis="columns", copy=False, inplace=False
)
)

# reset the index to address above concat merges and memory conservation (inplace)
sc_df.reset_index(inplace=True, drop=True)

if single_cell_normalize:
# Infering features is tricky with non-canonical data
if normalize_args is None:
Expand Down
117 changes: 93 additions & 24 deletions pycytominer/tests/test_cyto_utils/test_cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import random
import tempfile

import numpy as np
import pandas as pd
import pytest
from pycytominer import aggregate, annotate, normalize
Expand Down Expand Up @@ -255,6 +256,23 @@ def test_load_compartment():
check_dtype=False,
)

# test using non-default float_datatype
loaded_compartment_df = AP.load_compartment(
compartment="cells", float_datatype=np.float32
)
pd.testing.assert_frame_equal(
loaded_compartment_df,
CELLS_DF.astype(
# cast any float type columns to float32 for expected comparison
{
colname: np.float32
for colname in CELLS_DF.columns
if pd.api.types.is_float(CELLS_DF[colname].dtype)
}
).reindex(columns=loaded_compartment_df.columns),
check_dtype=False,
)


def test_sc_count_sql_table():
# Iterate over initialized compartments
Expand All @@ -273,6 +291,10 @@ def test_get_sql_table_col_names():


def test_merge_single_cells():
"""
Testing various SingleCells.merge_single_cells functionality
"""

sc_merged_df = AP.merge_single_cells()

# Assert that the image data was merged
Expand Down Expand Up @@ -300,21 +322,43 @@ def test_merge_single_cells():
)

# Confirm that the merge correctly reversed the object number (opposite from Parent)
assert (
sc_merged_df.Metadata_ObjectNumber_cytoplasm.tolist()[::-1]
== sc_merged_df.Metadata_ObjectNumber.tolist()
)
assert (
manual_merge.Metadata_ObjectNumber_cytoplasm.tolist()[::-1]
== sc_merged_df.Metadata_ObjectNumber.tolist()
)
assert (
manual_merge.Metadata_ObjectNumber_cytoplasm.tolist()[::-1]
== sc_merged_df.Metadata_ObjectNumber.tolist()
assert_cols = [
"Metadata_ObjectNumber",
"Metadata_ObjectNumber_cytoplasm",
"Metadata_ObjectNumber_cells",
]

# check that we have the same data using same cols, sort and a reset index
pd.testing.assert_frame_equal(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't looked blacked to me, please confirm

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran black on this, and --check doesn't seem to flag it (may be profile related). I'm seeing that there's a list end bumping up against the comment, I'll add in a newline there with new changes. Are you seeing anything else that looks stylistically out of place which I may change?

left=manual_merge[assert_cols]
.sort_values(by=assert_cols, ascending=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit concerned that sorting is giving us undue confidence of equivalent results. Am I reading this wrong? How should I be thinking about this?

Can you add an explicit check that the object number is the same? Oh, but maybe this code block is doing exactly that (and more!)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great comment, thank you! The chunked merges made the data inconsistent with the existing tests. The check here now looks at columns "Metadata_ObjectNumber", "Metadata_ObjectNumber_cytoplasm", "Metadata_ObjectNumber_cells", sorted by the same columns in the same order for data equivalency and does include an Metadata_ObjectNumber check as a result.

Would users of merge_single_cells expect for sorted output by the same columns or similar? We could include this as an additional step within the method, which would likely avoid needing to do that manually within testing here.

.reset_index(drop=True),
right=sc_merged_df[assert_cols]
.sort_values(by=assert_cols, ascending=True)
.reset_index(drop=True),
check_dtype=False,
)
assert (
manual_merge.Metadata_ObjectNumber_cells.tolist()
== sc_merged_df.Metadata_ObjectNumber.tolist()

# use non-default float_datatype
sc_merged_df = AP.merge_single_cells(float_datatype=np.float32)

# similar to the assert above, we test non-default float dtype specification
pd.testing.assert_frame_equal(
left=manual_merge[assert_cols]
.astype(
# cast any float type columns to float32 for expected comparison
{
colname: np.float32
for colname in manual_merge.columns
if pd.api.types.is_float(manual_merge[colname].dtype)
}
)
.sort_values(by=assert_cols, ascending=True)
.reset_index(drop=True),
right=sc_merged_df[assert_cols]
.sort_values(by=assert_cols, ascending=True)
.reset_index(drop=True),
check_dtype=False,
)

# Confirm the merge and adding merge options
Expand All @@ -335,19 +379,41 @@ def test_merge_single_cells():
manual_merge, method=method, samples=samples, features=features
)

# compare data using identical column order, sorting, and reset index
pd.testing.assert_frame_equal(
norm_method_df.sort_index(axis=1),
manual_merge_normalize.sort_index(axis=1),
norm_method_df[norm_method_df.columns]
.sort_values(by="Cells_a")
.reset_index(drop=True),
manual_merge_normalize[norm_method_df.columns]
.sort_values(by="Cells_a")
.reset_index(drop=True),
check_dtype=False,
)

# Test non-canonical compartment merging
new_sc_merge_df = AP_NEW.merge_single_cells()

assert sum(new_sc_merge_df.columns.str.startswith("New")) == 4
assert (
NEW_COMPARTMENT_DF.ObjectNumber.tolist()[::-1]
== new_sc_merge_df.Metadata_ObjectNumber_new.tolist()

assert_cols = [
"New_a",
"New_b",
"New_c",
"New_d",
"Metadata_ObjectNumber_new",
]
# compare data using identical column order, sorting, and reset index
# note: we rename NEW_COMPARTMENT_DF to match new_sc_merge_df's ObjectNumber colname
pd.testing.assert_frame_equal(
left=NEW_COMPARTMENT_DF.rename(
columns={"ObjectNumber": "Metadata_ObjectNumber_new"}
)[assert_cols]
.sort_values(by=assert_cols)
.reset_index(drop=True),
right=new_sc_merge_df[assert_cols]
.sort_values(by=assert_cols)
.reset_index(drop=True),
check_dtype=False,
)

norm_new_method_df = AP_NEW.merge_single_cells(
Expand Down Expand Up @@ -471,7 +537,6 @@ def test_merge_single_cells_cytominer_database_test_file():
f"{os.path.dirname(__file__)}/../test_data/cytominer_database_example_data/test_SQ00014613.parquet",
)
sql_url = f"sqlite:///{sql_path}"
print(sql_url)

# build SingleCells from database
sc_p = SingleCells(
Expand All @@ -493,8 +558,8 @@ def test_merge_single_cells_cytominer_database_test_file():
# note: pd.DataFrame datatypes sometimes appear automatically changed on-read, so we cast
# the result_file dataframe using the base dataframe's types.
pd.testing.assert_frame_equal(
pd.read_csv(csv_path).astype(merged_sc.dtypes.to_dict()),
pd.read_csv(result_file).astype(merged_sc.dtypes.to_dict()),
pd.read_csv(csv_path).astype(merged_sc.dtypes.to_dict())[merged_sc.columns],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for calling this out! For inner merges, Pandas "...preserve[s] the order of the left keys." (reference: pd.merge(..., how)). The column specification here is provided to account for the now differently ordered columns due to the "right" and "left" being swapped within the compartment merges. I also felt that accounting for exact column ordering over time may be unwieldy, so this may provide benefit towards development velocity. That said, we may need a change; do you think we should enforce strict column order within merge_single_cells to remove the need for specification here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gwaybio - just wanted to follow up here with some thoughts about merge_single_cells column order specification. If we make explicit column sorting a part of the method, we should be able to reduce the variability of data produced. Note: tests may still require column filtering (for example, if certain data do not exist in one dataframe vs another).

I wrote the following as an example of what we could do here:

import pandas as pd

# create an example dataframe with mixed order columns
df = pd.DataFrame(
    {
        "Image_Metadata_1": [0],
        "TableNumber": [0],
        "ImageNumber": [0],
        "Cytoplasm_Data_1": [0],
        "Nuclei_Data_1": [0],
        "Cells_Data_1": [0],
        "Mito_Data_1": [0],
        "Actin_Data_1": [0],
        "Image_Data_1": [0],
    }
)

# print df columns as a python list, representing new order
print("Initial order:")
print(df.columns.tolist())


def custom_sort(value: str):
    """
    A custom sort for Pycytominer merge_single_cells
    pd.dataframe columns
    """

    # lowercase str which will be used for comparisons
    # to avoid any capitalization challenges
    value_lower = value.lower()

    # first sorted values (by list index)
    sort_first = ["tablenumber", "imagenumber"]

    # middle sort value
    sort_middle = "metadata"

    # sorted last (by list order enumeration)
    sort_later = [
        "cells",
        "cytoplasm",
        "nuclei",
        "image",
    ]

    # if value is in the sort_first list
    # return the index from that list
    if value_lower in sort_first:
        return sort_first.index(value_lower)

    # if sort_middle is anywhere in value return
    # next index value after sort_first values
    elif sort_middle in value_lower:
        return len(sort_first)

    # if any sort_later are found as the first part of value
    # return enumerated index of sort_later value (starting from
    # relative len based on the above conditionals and lists)
    elif any(value_lower.startswith(val) for val in sort_later):
        for k, v in enumerate(sort_later, start=len(sort_first) + 1):
            if value_lower.startswith(v):
                return k

    # else we return the total length of all sort values
    return len(sort_first) + len(sort_later) + 1


# inner sorted alphabetizes any columns which may not be part of custom_sort
# outer sort provides pycytominer-specific column sort order
df = df[sorted(sorted(df.columns), key=custom_sort)]

# print df columns as a python list, representing new order
print("\nSorted order:")
print(df.columns.tolist())

Which has printed output:

Initial order:
['Image_Metadata_1', 'TableNumber', 'ImageNumber', 'Cytoplasm_Data_1', 'Nuclei_Data_1', 'Cells_Data_1', 'Mito_Data_1', 'Actin_Data_1', 'Image_Data_1']

Sorted order:
['TableNumber', 'ImageNumber', 'Image_Metadata_1', 'Cells_Data_1', 'Cytoplasm_Data_1', 'Nuclei_Data_1', 'Image_Data_1', 'Actin_Data_1', 'Mito_Data_1']

pd.read_csv(result_file).astype(merged_sc.dtypes.to_dict())[merged_sc.columns],
)

# test parquet output from merge_single_cells
Expand All @@ -507,8 +572,12 @@ def test_merge_single_cells_cytominer_database_test_file():
# note: pd.DataFrame datatypes sometimes appear automatically changed on-read, so we cast
# the result_file dataframe using the base dataframe's types.
pd.testing.assert_frame_equal(
pd.read_parquet(parquet_path).astype(merged_sc.dtypes.to_dict()),
pd.read_parquet(result_file).astype(merged_sc.dtypes.to_dict()),
pd.read_parquet(parquet_path).astype(merged_sc.dtypes.to_dict())[
merged_sc.columns
],
pd.read_parquet(result_file).astype(merged_sc.dtypes.to_dict())[
merged_sc.columns
],
)

# test parquet output from merge_single_cells with annotation meta
Expand Down