From 549463641129dcc451023f04290dfeb028708087 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karl=20Sv=C3=A4rd?= Date: Fri, 15 Mar 2024 16:51:33 +0100 Subject: [PATCH] restructure file and add more type hints --- cg_lims/EPPs/udf/calculate/qpcr_dilution.py | 105 ++++++++++---------- 1 file changed, 53 insertions(+), 52 deletions(-) diff --git a/cg_lims/EPPs/udf/calculate/qpcr_dilution.py b/cg_lims/EPPs/udf/calculate/qpcr_dilution.py index a59f2f9a..bd1d0f18 100644 --- a/cg_lims/EPPs/udf/calculate/qpcr_dilution.py +++ b/cg_lims/EPPs/udf/calculate/qpcr_dilution.py @@ -17,42 +17,6 @@ LOG = logging.getLogger(__name__) -def parse_quantification_summary(summary_file: str) -> Dict: - """Parse Quantification Summary excel file and return python dict with - original wells as the keys and WellValues objects as the values.""" - df = pd.read_excel(summary_file) - quantification_data = {} - for index, row in df.iterrows(): - well = row["Well"] - cq = round(row["Cq"], 3) - sq = row["SQ"] - if not (np.isnan(cq) or np.isnan(sq)): - orig_well = WELL_TRANSFORMER[well] - if orig_well not in quantification_data.keys(): - quantification_data[orig_well] = WellValues(well=orig_well) - quantification_data[orig_well].add_values(sq_value=sq, cq_value=cq) - return quantification_data - - -def calculate_molar_concentration(sq_values: List[float], size_bp: int): - """Calculate and return the molar concentration given a list of SQ values and a fragment size.""" - original_conc = mean(sq_values) * 10**4 - return original_conc * (452 / size_bp) - - -def get_index_of_biggest_outlier(values: List[float]) -> int: - """Return the index of the largest outlier in the given list of values.""" - mean_value = mean(values) - dev_from_mean = [abs(value - mean_value) for value in values] - max_dev = max(dev_from_mean) - return dev_from_mean.index(max_dev) - - -def get_max_difference(values: List[float]) -> float: - """Return the difference between the largest and smallest values in a given list of values.""" - return max(values) - min(values) - - class WellValues: def __init__(self, well): self.well: str = well @@ -68,26 +32,26 @@ def add_values(self, sq_value: float, cq_value: float) -> None: def connect_artifact(self, artifact: Artifact) -> None: """Connect an artifact to the well object.""" - self.artifact = artifact + self.artifact: Artifact = artifact def _trim_outliers(self) -> str: """Remove the largest outlier of the current Cq and SQ values.""" - outlier_index = get_index_of_biggest_outlier(values=self.cq) - removed_cq_value = self.cq.pop(outlier_index) - removed_sq_value = self.sq.pop(outlier_index) + outlier_index: int = get_index_of_biggest_outlier(values=self.cq) + removed_cq_value: float = self.cq.pop(outlier_index) + removed_sq_value: float = self.sq.pop(outlier_index) return f"Removed outlier Cq value {removed_cq_value} and SQ value {removed_sq_value} from {self.artifact.samples[0].id}." def get_concentration(self, cq_threshold: float, size_bp: int) -> float: """Return the concentration (M) of the well object, given a Cq difference threshold and fragment size.""" - cq_set_difference = get_max_difference(self.cq) + cq_set_difference: float = get_max_difference(self.cq) if cq_set_difference > cq_threshold: - message = ( + message: str = ( f" Cq value difference is too high between the replicates of sample {self.artifact.samples[0].id}.\n" f"Difference: {cq_set_difference}\n" f"Cq values: {self.cq}\n" f"SQ values: {self.sq}\n" ) - trim_log = self._trim_outliers() + trim_log: str = self._trim_outliers() message = message + trim_log LOG.info(message) return calculate_molar_concentration(sq_values=self.sq, size_bp=size_bp) @@ -96,7 +60,7 @@ def set_artifact_udfs( self, concentration_threshold: float, replicate_threshold: float, size_bp: int ) -> None: """Calculate and set all UDFs and QC flags of the connected artifact.""" - molar_concentration = self.get_concentration( + molar_concentration: float = self.get_concentration( cq_threshold=float(replicate_threshold), size_bp=int(size_bp) ) self.artifact.qc_flag = "PASSED" @@ -112,6 +76,41 @@ def set_artifact_udfs( self.artifact.put() +def parse_quantification_summary(summary_file: str) -> Dict[str, WellValues]: + """Parse Quantification Summary excel file and return python dict with + original wells as the keys and WellValues objects as the values.""" + df: pd.DataFrame = pd.read_excel(summary_file) + quantification_data: Dict = {} + for index, row in df.iterrows(): + well: str = row["Well"] + cq: float = round(row["Cq"], 3) + sq: float = row["SQ"] + if not (np.isnan(cq) or np.isnan(sq)): + orig_well: str = WELL_TRANSFORMER[well] + if orig_well not in quantification_data.keys(): + quantification_data[orig_well] = WellValues(well=orig_well) + quantification_data[orig_well].add_values(sq_value=sq, cq_value=cq) + return quantification_data + + +def calculate_molar_concentration(sq_values: List[float], size_bp: int): + """Calculate and return the molar concentration given a list of SQ values and a fragment size.""" + return (mean(sq_values) * 10**4) * (452 / size_bp) + + +def get_index_of_biggest_outlier(values: List[float]) -> int: + """Return the index of the largest outlier in the given list of values.""" + mean_value: float = mean(values) + dev_from_mean: List[float] = [abs(value - mean_value) for value in values] + max_dev: float = max(dev_from_mean) + return dev_from_mean.index(max_dev) + + +def get_max_difference(values: List[float]) -> float: + """Return the difference between the largest and smallest values in a given list of values.""" + return max(values) - min(values) + + @click.command() @options.file_placeholder(help="qPCR result file placeholder name.") @options.local_file() @@ -133,20 +132,22 @@ def qpcr_dilution( process: Process = ctx.obj["process"] if local_file: - file_path = local_file + file_path: str = local_file else: - file_art = get_artifact_by_name(process=process, name=file) - file_path = get_file_path(file_art) + file_art: Artifact = get_artifact_by_name(process=process, name=file) + file_path: str = get_file_path(file_art) if not Path(file_path).is_file(): raise MissingFileError(f"No such file: {file_path}") try: artifacts: List[Artifact] = get_artifacts(process=process, measurement=True) - quantification_data = parse_quantification_summary(summary_file=file_path) - failed_samples = 0 + quantification_data: Dict[str, WellValues] = parse_quantification_summary( + summary_file=file_path + ) + failed_samples: int = 0 for artifact in artifacts: - artifact_well = artifact.location[1] + artifact_well: str = artifact.location[1] if artifact_well not in quantification_data.keys(): raise MissingValueError( f"No values found for well {artifact_well} in the result file! " @@ -163,12 +164,12 @@ def qpcr_dilution( failed_samples += 1 if failed_samples: - error_message = ( + error_message: str = ( f" {failed_samples} sample(s) failed the QC! See the logs for further information." ) raise FailingQCError(error_message) - message = " Concentrations have been calculated and set for all samples!" + message: str = " Concentrations have been calculated and set for all samples!" LOG.info(message) click.echo(message) except LimsError as e: