Skip to content

Commit

Permalink
Merge pull request #17 from snipe-bio/minor_fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-eyes authored Oct 18, 2024
2 parents 1e11f0f + a93c717 commit d0b6542
Showing 11 changed files with 91 additions and 127 deletions.
Binary file removed src/.DS_Store
Binary file not shown.
Binary file removed src/snipe/.DS_Store
Binary file not shown.
4 changes: 4 additions & 0 deletions src/snipe/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from snipe.api.snipe_sig import SnipeSig
from snipe.api.enums import SigType
from snipe.api.multisig_reference_QC import MultiSigReferenceQC
from snipe.api.reference_QC import ReferenceQC
76 changes: 53 additions & 23 deletions src/snipe/api/multisig_reference_QC.py
Original file line number Diff line number Diff line change
@@ -250,9 +250,9 @@ def __init__(self, *,
reference_sig: SnipeSig,
amplicon_sig: Optional[SnipeSig] = None,
ychr: Optional[SnipeSig] = None,
chr_to_sig: Optional[Dict[str, SnipeSig]] = None,
varsigs: Optional[List[SnipeSig]] = None,
enable_logging: bool = False,
export_varsigs: bool = False,
**kwargs):

# Initialize logger
@@ -293,17 +293,24 @@ def __init__(self, *,
self.logger.error("Invalid signature type for ychr: %s", ychr.sigtype)
raise ValueError(f"ychr must be of type {SigType.SAMPLE}, got {ychr.sigtype}")

self.specific_chr_to_sig: Optional[Dict[str, SnipeSig]] = None
self.specific_chr_to_sig: Optional[Dict[str, SnipeSig]] = reference_sig.chr_to_sig

if ychr is not None and chr_to_sig is not None:
chr_to_sig['sex-y'] = ychr
if ychr is not None and self.specific_chr_to_sig is not None:
self.logger.debug("Y chromosome signature provided and passed to the specific_kmers function.")
self.specific_chr_to_sig['sex-y'] = ychr

if chr_to_sig is not None:
self.logger.debug("Computing specific chromosome hashes for %s.", ','.join(chr_to_sig.keys()))
self.logger.debug(f"\t-All hashes for chromosomes before getting unique sigs {len(SnipeSig.sum_signatures(list(chr_to_sig.values())))}")
self.specific_chr_to_sig = SnipeSig.get_unique_signatures({sig_name: sig for sig_name, sig in chr_to_sig.items() if not sig_name.endswith("-snipegenome")})
if self.specific_chr_to_sig is not None:
self.logger.debug("Computing specific chromosome hashes for %s.", ','.join(self.specific_chr_to_sig.keys()))
self.logger.debug(f"\t-All hashes for chromosomes before getting unique sigs {len(SnipeSig.sum_signatures(list(self.specific_chr_to_sig.values())))}")
self.specific_chr_to_sig = SnipeSig.get_unique_signatures({sig_name: sig for sig_name, sig in self.specific_chr_to_sig.items() if not sig_name.endswith("-snipegenome")})
self.logger.debug(f"\t-All hashes for chromosomes after getting unique sigs {len(SnipeSig.sum_signatures(list(self.specific_chr_to_sig.values())))}")

# now remove the mitochondrial if present
# if "mitochondrial-M" in self.specific_chr_to_sig:
# self.specific_chr_to_sig.pop("mitochondrial-M")
# self.logger.debug("Removed mitochondrial-M from specific_chr_to_sig.")
# self.logger.debug(f"\t-All hashes for chromosomes after removing mitochondrial-M {len(SnipeSig.sum_signatures(list(self.specific_chr_to_sig.values())))}")

self.variance_sigs: Optional[List[SnipeSig]] = None
if varsigs is not None:
self.logger.debug("Variance signatures provided.")
@@ -326,6 +333,7 @@ def __init__(self, *,
self.reference_sig = reference_sig
self.amplicon_sig = amplicon_sig
self.enable_logging = enable_logging
self.export_varsigs = export_varsigs
self.sample_to_stats = {}


@@ -346,7 +354,7 @@ def process_sample(self, sample_sig: SnipeSig, predict_extra_folds: Optional[Lis
sex_stats: Dict[str, Any] = {}
predicted_error_contamination_index: Dict[str, Any] = {}
vars_nonref_stats: Dict[str, Any] = {}
chr_to_mean_abundance: Dict[str, float] = {}
chr_to_mean_abundance: Dict[str, np.float64] = {}
predicted_assay_type: str = "WGS"
roi_stats: Dict[str, Any] = {}

@@ -582,25 +590,36 @@ def sort_chromosomes(chr_name):
for chr_name in sorted(chr_to_mean_abundance, key=sort_chromosomes)
}

# Delete the mitochondrial from sorted_chr_to_mean_abundance
if "mitochondrial-M" in sorted_chr_to_mean_abundance:
self.logger.debug("Removing mitochondrial-M from sorted_chr_to_mean_abundance.")
sorted_chr_to_mean_abundance.pop("mitochondrial-M")

chrs_stats.update(sorted_chr_to_mean_abundance)

# chr_to_mean_abundance but without any chr with partial name sex
autosomal_chr_to_mean_abundance = {}
for chr_name, mean_abundance in chr_to_mean_abundance.items():
if "sex" in chr_name.lower() or "-snipegenome" in chr_name.lower():
if "sex" in chr_name.lower() or "-snipegenome" in chr_name.lower() or "mitochondrial" in chr_name.lower():
self.logger.debug("Skipping %s from autosomal_chr_to_mean_abundance.", chr_name)
continue

self.logger.debug("Adding %s to autosomal_chr_to_mean_abundance.", chr_name)
autosomal_chr_to_mean_abundance[chr_name] = mean_abundance


# calculate the CV for the whole sample
if autosomal_chr_to_mean_abundance:
mean_abundances = np.array(list(autosomal_chr_to_mean_abundance.values()), dtype=float)
mean_abundances = np.array(list(autosomal_chr_to_mean_abundance.values()), dtype=np.float64)
cv = np.std(mean_abundances) / np.mean(mean_abundances) if np.mean(mean_abundances) != 0 else 0.0
chrs_stats.update({"Autosomal_CV": cv})
assert "Autosomal_CV" in chrs_stats
self.logger.debug("Calculated Autosomal CV: %f", cv)
else:
self.logger.warning("No autosomal chromosomes were processed. 'Autosomal_CV' set to None.")
chrs_stats.update({"Autosomal_CV": None})
assert "Autosomal_CV" in chrs_stats


# ============= SEX STATS =============

@@ -609,9 +628,8 @@ def sort_chromosomes(chr_name):
self.logger.debug("Length of genome before subtracting sex chromosomes %s", len(self.reference_sig))
autosomals_genome_sig = self.reference_sig.copy()
for chr_name, chr_sig in self.specific_chr_to_sig.items():
if "sex" in chr_name.lower():
if "sex" in chr_name.lower() or "mitochondrial" in chr_name.lower():
self.logger.debug("Removing %s chromosome from the autosomal genome signature.", chr_name)
self.logger.debug("Type of autosomals_genome_sig: %s | Type of chr_sig: %s", autosomals_genome_sig.sigtype, chr_sig.sigtype)
self.logger.debug("Length of autosomals_genome_sig: %s | Length of chr_sig: %s", len(autosomals_genome_sig), len(chr_sig))
autosomals_genome_sig -= chr_sig
self.logger.debug("Length of genome after subtracting sex chromosomes %s", len(autosomals_genome_sig))
@@ -640,7 +658,7 @@ def sort_chromosomes(chr_name):
self.logger.debug("\t-Derived X chromosome-specific signature size: %d hashes.", len(specific_xchr_sig))

# Intersect the sample signature with chromosome-specific signatures
sample_specific_xchr_sig = sample_sig & specific_xchr_sig
sample_specific_xchr_sig = sample_sig & self.specific_chr_to_sig['sex-x']
if len(sample_specific_xchr_sig) == 0:
self.logger.warning("No X chromosome-specific k-mers found in the sample signature.")
self.logger.debug("\t-Intersected sample signature with X chromosome-specific k-mers = %d hashes.", len(sample_specific_xchr_sig))
@@ -656,11 +674,11 @@ def sort_chromosomes(chr_name):
self.logger.warning("Autosomal mean abundance is zero. Setting X-Ploidy score to zero to avoid division by zero.")
xploidy_score = 0.0
else:
xploidy_score = (xchr_mean_abundance / autosomal_mean_abundance) * \
(len(autosomals_genome_sig) / len(specific_xchr_sig) if len(specific_xchr_sig) > 0 else 0.0)

xploidy_score = (xchr_mean_abundance / autosomal_mean_abundance) if len(specific_xchr_sig) > 0 else 0.0

self.logger.debug("Calculated X-Ploidy score: %.4f", xploidy_score)
sex_stats.update({"X-Ploidy score": xploidy_score})
self.logger.debug("X-Ploidy score: %.4f", sex_stats["X-Ploidy score"])

# Calculate Y-Coverage if Y chromosome is present
if 'sex-y' in self.specific_chr_to_sig and 'sex-x' in self.specific_chr_to_sig:
@@ -684,8 +702,7 @@ def sort_chromosomes(chr_name):
self.logger.warning("Insufficient k-mers for Y-Coverage calculation. Setting Y-Coverage to zero.")
ycoverage = 0.0
else:
ycoverage = (len(ychr_in_sample) / len(ychr_specific_kmers)) / \
(len(sample_autosomal_sig) / len(autosomals_specific_kmers))
ycoverage = (len(ychr_in_sample) / len(ychr_specific_kmers)) / (len(sample_autosomal_sig) / len(autosomals_specific_kmers))

self.logger.debug("Calculated Y-Coverage: %.4f", ycoverage)
sex_stats.update({"Y-Coverage": ycoverage})
@@ -714,6 +731,15 @@ def sort_chromosomes(chr_name):
for variance_sig in self.variance_sigs:
variance_name = variance_sig.name
sample_nonref_var: SnipeSig = sample_nonref & variance_sig

if self.export_varsigs:
_export_var_name = variance_name.replace(' ','_').lower()
_export_sample_name = f"{sample_sig.name}_{_export_var_name}_nonref"
_export_name = _export_sample_name + '_' + _export_var_name
sample_nonref_var.name = _export_name
self.logger.debug("Exporting non-reference k-mers from variable '%s'.", variance_name)
sample_nonref_var.export(f"{sample_sig.name}_{variance_name}_nonref.zip")

sample_nonref_var_total_abundance = sample_nonref_var.total_abundance
sample_nonref_var_unique_hashes = len(sample_nonref_var)
sample_nonref_var_coverage_index = sample_nonref_var_unique_hashes / sample_nonref_unique_hashes
@@ -884,10 +910,14 @@ def saturation_model(x, a, b):
aggregated_stats.update(amplicon_stats)
if advanced_stats:
aggregated_stats.update(advanced_stats)
if chrs_stats:
aggregated_stats.update(chrs_stats)
if chrs_stats:
aggregated_stats.update(chrs_stats)
else:
self.logger.warning("No chromosome stats were processed.")
if sex_stats:
aggregated_stats.update(sex_stats)
else:
self.logger.warning("No sex-metrics stats were processed.")
if predicted_error_contamination_index:
aggregated_stats.update(predicted_error_contamination_index)
if vars_nonref_stats:
8 changes: 4 additions & 4 deletions src/snipe/api/reference_QC.py
Original file line number Diff line number Diff line change
@@ -1133,6 +1133,7 @@ def sort_chromosomes(chr_name):
for chr_name, mean_abundance in chr_to_mean_abundance.items():
if "sex" in chr_name.lower():
continue
self.logger.debug("Adding %s to autosomal chromosomes.", chr_name)
autosomal_chr_to_mean_abundance[chr_name] = mean_abundance


@@ -1144,7 +1145,7 @@ def sort_chromosomes(chr_name):
self.logger.debug("Calculated Autosomal CV: %f", cv)
else:
self.logger.warning("No autosomal chromosomes were processed. 'Autosomal_CV' set to None.")
self.chrs_stats.update({"Autosomal_CV": None})
self.sex.update({"Autosomal_CV": None})

# optional return, not required
return self.chrs_stats
@@ -1299,7 +1300,7 @@ def calculate_sex_chrs_metrics(self, genome_and_chr_to_sig: Dict[str, SnipeSig])
self.logger.debug("\t-Derived X chromosome-specific signature size: %d hashes.", len(specific_xchr_sig))

# Intersect the sample signature with chromosome-specific signatures
sample_specific_xchr_sig = self.sample_sig & specific_xchr_sig
sample_specific_xchr_sig = self.sample_sig & specific_chr_to_sig['sex-x'] # specific_xchr_sig
if len(sample_specific_xchr_sig) == 0:
self.logger.warning("No X chromosome-specific k-mers found in the sample signature.")
self.logger.debug("\t-Intersected sample signature with X chromosome-specific k-mers = %d hashes.", len(sample_specific_xchr_sig))
@@ -1315,8 +1316,7 @@ def calculate_sex_chrs_metrics(self, genome_and_chr_to_sig: Dict[str, SnipeSig])
self.logger.warning("Autosomal mean abundance is zero. Setting X-Ploidy score to zero to avoid division by zero.")
xploidy_score = 0.0
else:
xploidy_score = (xchr_mean_abundance / autosomal_mean_abundance) * \
(len(autosomals_genome_sig) / len(specific_xchr_sig) if len(specific_xchr_sig) > 0 else 0.0)
xploidy_score = (xchr_mean_abundance / autosomal_mean_abundance) if len(specific_xchr_sig) > 0 else 0.0

self.logger.debug("Calculated X-Ploidy score: %.4f", xploidy_score)
self.sex_stats.update({"X-Ploidy score": xploidy_score})
7 changes: 6 additions & 1 deletion src/snipe/api/sketch.py
Original file line number Diff line number Diff line change
@@ -358,12 +358,15 @@ def parse_fasta_header(self, header: str) -> Tuple[str, str]:
name = match.group(1) if match else "unknown"
elif stype in {"mitochondrial DNA", "chloroplast DNA"}:
name = stype.split()[0]
stype = name.lower()
elif stype == "chromosome":
match = re.search(r"(?:chromosome|chr)[_\s]*([^\s,]+)", header_lower)
if match:
name = match.group(1).rstrip(".,")
if name.upper() in {"X", "Y", "W", "Z"}:
stype = "sex"
elif name.upper() == "M":
stype = "mitochondrial"
else:
stype = "autosome"
elif stype == "reference chromosome":
@@ -417,9 +420,11 @@ def process_sequence(
with mh_lock:
mh_full.merge(current_mh)

if seq_type in {"sex", "autosome"}:
if seq_type in {"sex", "autosome", "mitochondrial"}:
with chr_lock:
key = f"{seq_type}-{seq_name}"
if seq_type == "mitochondrial":
key = "mitochondrial-M"
if key not in chr_to_mh:
chr_to_mh[key] = current_mh
else:
14 changes: 11 additions & 3 deletions src/snipe/api/snipe_sig.py
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ def __init__(self, *,
self.logger.setLevel(logging.CRITICAL)

# Initialize internal variables
self.logger.debug("Initializing SnipeSig with sourmash_sig: %s", sourmash_sig)
self.logger.debug("Initializing SnipeSig with sourmash_sig")

self._scale: int = None
self._ksize: int = None
@@ -124,12 +124,12 @@ def __init__(self, *,
elif signame.startswith("sex-"):
self.logger.debug("Found a sex chr signature %s", signame)
sig = sig.to_mutable()
# sig.name = signame.replace("sex-","")
self.chr_to_sig[sig.name] = SnipeSig(sourmash_sig=sig, sig_type=SigType.AMPLICON, enable_logging=enable_logging)
elif signame.startswith("autosome-"):
self.logger.debug("Found an autosome signature %s", signame)
sig = sig.to_mutable()
# sig.name = signame.replace("autosome-","")
self.chr_to_sig[sig.name] = SnipeSig(sourmash_sig=sig, sig_type=SigType.AMPLICON, enable_logging=enable_logging)
elif signame.startswith("mitochondrial-"):
self.chr_to_sig[sig.name] = SnipeSig(sourmash_sig=sig, sig_type=SigType.AMPLICON, enable_logging=enable_logging)
else:
continue
@@ -282,6 +282,14 @@ def sigtype(self, sigtype: SigType):
Set the type of the signature.
"""
self._type = sigtype

# setter for name
@name.setter
def name(self, name: str):
r"""
Set the name of the signature.
"""
self._name = name

@track_abundance.setter
def track_abundance(self, track_abundance: bool):
Loading

0 comments on commit d0b6542

Please sign in to comment.