Skip to content

Commit

Permalink
restructure file and add more type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
Karl-Svard committed Mar 15, 2024
1 parent cd846d9 commit 5494636
Showing 1 changed file with 53 additions and 52 deletions.
105 changes: 53 additions & 52 deletions cg_lims/EPPs/udf/calculate/qpcr_dilution.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,6 @@
LOG = logging.getLogger(__name__)


def parse_quantification_summary(summary_file: str) -> Dict:
"""Parse Quantification Summary excel file and return python dict with
original wells as the keys and WellValues objects as the values."""
df = pd.read_excel(summary_file)
quantification_data = {}
for index, row in df.iterrows():
well = row["Well"]
cq = round(row["Cq"], 3)
sq = row["SQ"]
if not (np.isnan(cq) or np.isnan(sq)):
orig_well = WELL_TRANSFORMER[well]
if orig_well not in quantification_data.keys():
quantification_data[orig_well] = WellValues(well=orig_well)
quantification_data[orig_well].add_values(sq_value=sq, cq_value=cq)
return quantification_data


def calculate_molar_concentration(sq_values: List[float], size_bp: int):
"""Calculate and return the molar concentration given a list of SQ values and a fragment size."""
original_conc = mean(sq_values) * 10**4
return original_conc * (452 / size_bp)


def get_index_of_biggest_outlier(values: List[float]) -> int:
"""Return the index of the largest outlier in the given list of values."""
mean_value = mean(values)
dev_from_mean = [abs(value - mean_value) for value in values]
max_dev = max(dev_from_mean)
return dev_from_mean.index(max_dev)


def get_max_difference(values: List[float]) -> float:
"""Return the difference between the largest and smallest values in a given list of values."""
return max(values) - min(values)


class WellValues:
def __init__(self, well):
self.well: str = well
Expand All @@ -68,26 +32,26 @@ def add_values(self, sq_value: float, cq_value: float) -> None:

def connect_artifact(self, artifact: Artifact) -> None:
"""Connect an artifact to the well object."""
self.artifact = artifact
self.artifact: Artifact = artifact

def _trim_outliers(self) -> str:
"""Remove the largest outlier of the current Cq and SQ values."""
outlier_index = get_index_of_biggest_outlier(values=self.cq)
removed_cq_value = self.cq.pop(outlier_index)
removed_sq_value = self.sq.pop(outlier_index)
outlier_index: int = get_index_of_biggest_outlier(values=self.cq)
removed_cq_value: float = self.cq.pop(outlier_index)
removed_sq_value: float = self.sq.pop(outlier_index)
return f"Removed outlier Cq value {removed_cq_value} and SQ value {removed_sq_value} from {self.artifact.samples[0].id}."

def get_concentration(self, cq_threshold: float, size_bp: int) -> float:
"""Return the concentration (M) of the well object, given a Cq difference threshold and fragment size."""
cq_set_difference = get_max_difference(self.cq)
cq_set_difference: float = get_max_difference(self.cq)
if cq_set_difference > cq_threshold:
message = (
message: str = (
f" Cq value difference is too high between the replicates of sample {self.artifact.samples[0].id}.\n"
f"Difference: {cq_set_difference}\n"
f"Cq values: {self.cq}\n"
f"SQ values: {self.sq}\n"
)
trim_log = self._trim_outliers()
trim_log: str = self._trim_outliers()
message = message + trim_log
LOG.info(message)
return calculate_molar_concentration(sq_values=self.sq, size_bp=size_bp)
Expand All @@ -96,7 +60,7 @@ def set_artifact_udfs(
self, concentration_threshold: float, replicate_threshold: float, size_bp: int
) -> None:
"""Calculate and set all UDFs and QC flags of the connected artifact."""
molar_concentration = self.get_concentration(
molar_concentration: float = self.get_concentration(
cq_threshold=float(replicate_threshold), size_bp=int(size_bp)
)
self.artifact.qc_flag = "PASSED"
Expand All @@ -112,6 +76,41 @@ def set_artifact_udfs(
self.artifact.put()


def parse_quantification_summary(summary_file: str) -> Dict[str, WellValues]:
"""Parse Quantification Summary excel file and return python dict with
original wells as the keys and WellValues objects as the values."""
df: pd.DataFrame = pd.read_excel(summary_file)
quantification_data: Dict = {}
for index, row in df.iterrows():
well: str = row["Well"]
cq: float = round(row["Cq"], 3)
sq: float = row["SQ"]
if not (np.isnan(cq) or np.isnan(sq)):
orig_well: str = WELL_TRANSFORMER[well]
if orig_well not in quantification_data.keys():
quantification_data[orig_well] = WellValues(well=orig_well)
quantification_data[orig_well].add_values(sq_value=sq, cq_value=cq)
return quantification_data


def calculate_molar_concentration(sq_values: List[float], size_bp: int):
"""Calculate and return the molar concentration given a list of SQ values and a fragment size."""
return (mean(sq_values) * 10**4) * (452 / size_bp)


def get_index_of_biggest_outlier(values: List[float]) -> int:
"""Return the index of the largest outlier in the given list of values."""
mean_value: float = mean(values)
dev_from_mean: List[float] = [abs(value - mean_value) for value in values]
max_dev: float = max(dev_from_mean)
return dev_from_mean.index(max_dev)


def get_max_difference(values: List[float]) -> float:
"""Return the difference between the largest and smallest values in a given list of values."""
return max(values) - min(values)


@click.command()
@options.file_placeholder(help="qPCR result file placeholder name.")
@options.local_file()
Expand All @@ -133,20 +132,22 @@ def qpcr_dilution(
process: Process = ctx.obj["process"]

if local_file:
file_path = local_file
file_path: str = local_file
else:
file_art = get_artifact_by_name(process=process, name=file)
file_path = get_file_path(file_art)
file_art: Artifact = get_artifact_by_name(process=process, name=file)
file_path: str = get_file_path(file_art)

if not Path(file_path).is_file():
raise MissingFileError(f"No such file: {file_path}")

try:
artifacts: List[Artifact] = get_artifacts(process=process, measurement=True)
quantification_data = parse_quantification_summary(summary_file=file_path)
failed_samples = 0
quantification_data: Dict[str, WellValues] = parse_quantification_summary(
summary_file=file_path
)
failed_samples: int = 0
for artifact in artifacts:
artifact_well = artifact.location[1]
artifact_well: str = artifact.location[1]
if artifact_well not in quantification_data.keys():
raise MissingValueError(
f"No values found for well {artifact_well} in the result file! "
Expand All @@ -163,12 +164,12 @@ def qpcr_dilution(
failed_samples += 1

if failed_samples:
error_message = (
error_message: str = (
f" {failed_samples} sample(s) failed the QC! See the logs for further information."
)
raise FailingQCError(error_message)

message = " Concentrations have been calculated and set for all samples!"
message: str = " Concentrations have been calculated and set for all samples!"
LOG.info(message)
click.echo(message)
except LimsError as e:
Expand Down

0 comments on commit 5494636

Please sign in to comment.