Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separating qc from parallel qc and enhancing processing #15

Merged
merged 13 commits into from
Oct 16, 2024
2 changes: 1 addition & 1 deletion .github/workflows/CI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
# Define a matrix of Python versions to test against
strategy:
matrix:
python-version: [3.11, 3.12]
python-version: [3.11, 3.12, 3.13]

steps:
# Step 1: Checkout the repository
Expand Down
901 changes: 901 additions & 0 deletions src/snipe/api/multisig_reference_QC.py

Large diffs are not rendered by default.

171 changes: 2 additions & 169 deletions src/snipe/api/reference_QC.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,8 @@ def split_sig_randomly(self, n: int) -> List[SnipeSig]:

self.logger.debug("No cached splits found for n=%d. Proceeding to split.", n)
# Get k-mers and abundances
hash_to_abund = dict(zip(self.sample_sig.hashes, self.sample_sig.abundances))
_sample_genome = self.sample_sig & self.reference_sig
hash_to_abund = dict(zip(_sample_genome.hashes, _sample_genome.abundances))
random_split_sigs = self.distribute_kmers_random(hash_to_abund, n)
split_sigs = [
SnipeSig.create_from_hashes_abundances(
Expand Down Expand Up @@ -1555,171 +1556,3 @@ def load_genome_sig_to_dict(self, *, zip_file_path: str, **kwargs) -> Dict[str,


return genome_chr_name_to_sig


class PreparedQC(ReferenceQC):
r"""
Class for quality control (QC) analysis of sample signature against prepared snipe profiles.
"""

def __init__(self, *, sample_sig: SnipeSig, snipe_db_path: str = '~/.snipe/dbs/', ref_id: Optional[str] = None, amplicon_id: Optional[str] = None, enable_logging: bool = False, **kwargs):
"""
Initialize the PreparedQC instance.

**Parameters**

- `sample_sig` (`SnipeSig`): The sample k-mer signature.
- `snipe_db_path` (`str`): Path to the local Snipe database directory.
- `ref_id` (`Optional[str]`): Reference identifier for selecting specific profiles.
- `enable_logging` (`bool`): Flag to enable detailed logging.
- `**kwargs`: Additional keyword arguments.
"""
self.snipe_db_path = os.path.expanduser(snipe_db_path)
self.ref_id = ref_id

# Ensure the local database directory exists
os.makedirs(self.snipe_db_path, exist_ok=True)
if enable_logging:
self.logger.debug(f"Local Snipe DB path set to: {self.snipe_db_path}")
else:
self.logger.debug("Logging is disabled for PreparedQC.")

# Initialize without a reference signature for now; it can be set after downloading
super().__init__(
sample_sig=sample_sig,
reference_sig=None, # To be set after downloading
enable_logging=enable_logging,
**kwargs
)

def download_osf_db(self, url: str, save_path: str = '~/.snipe/dbs', force: bool = False) -> Optional[str]:
"""
Download a file from OSF using the provided URL. The file is saved with its original name
as specified by the OSF server via the Content-Disposition header.

**Parameters**

- `url` (`str`): The OSF URL to download the file from.
- `save_path` (`str`): The directory path where the file will be saved. Supports user (~) and environment variables.
Default is the local Snipe database directory.
- `force` (`bool`): If True, overwrite the file if it already exists. Default is False.

**Returns**

- `Optional[str]`: The path to the downloaded file if successful, else None.

**Raises**

- `requests.exceptions.RequestException`: If an error occurs during the HTTP request.
- `Exception`: For any other exceptions that may arise.
"""
try:
# Expand user (~) and environment variables in save_path
expanded_save_path = os.path.expanduser(os.path.expandvars(save_path))
self.logger.debug(f"Expanded save path: {expanded_save_path}")

# Ensure the download URL ends with '/download'
parsed_url = urlparse(url)
if not parsed_url.path.endswith('/download'):
download_url = f"{url.rstrip('/')}/download"
else:
download_url = url

self.logger.debug(f"Download URL: {download_url}")

# Ensure the save directory exists
os.makedirs(expanded_save_path, exist_ok=True)
self.logger.debug(f"Save path verified/created: {expanded_save_path}")

# Initiate the GET request with streaming
with requests.get(download_url, stream=True, allow_redirects=True) as response:
response.raise_for_status() # Raise an exception for HTTP errors

# Attempt to extract filename from Content-Disposition
content_disposition = response.headers.get('Content-Disposition')
filename = self._extract_filename(content_disposition, parsed_url.path)
self.logger.debug(f"Filename determined: {filename}")

# Define the full save path
full_save_path = os.path.join(expanded_save_path, filename)
self.logger.debug(f"Full save path: {full_save_path}")

# Check if the file already exists
if os.path.exists(full_save_path):
if force:
self.logger.info(f"Overwriting existing file: {full_save_path}")
else:
self.logger.info(f"File already exists: {full_save_path}. Skipping download.")
return full_save_path

# Get the total file size for the progress bar
total_size = int(response.headers.get('Content-Length', 0))

# Initialize the progress bar
with open(full_save_path, 'wb') as file, tqdm(
total=total_size,
unit='B',
unit_scale=True,
unit_divisor=1024,
desc=filename,
ncols=100
) as bar:
for chunk in response.iter_content(chunk_size=1024):
if chunk: # Filter out keep-alive chunks
file.write(chunk)
bar.update(len(chunk))

self.logger.info(f"File downloaded successfully: {full_save_path}")
return full_save_path

except requests.exceptions.RequestException as req_err:
self.logger.error(f"Request error occurred while downloading {url}: {req_err}")
raise
except Exception as e:
self.logger.error(f"An unexpected error occurred while downloading {url}: {e}")
raise

def _extract_filename(self, content_disposition: Optional[str], url_path: str) -> str:
"""
Extract filename from Content-Disposition header or fallback to URL path.

**Parameters**

- `content_disposition` (`Optional[str]`): The Content-Disposition header value.
- `url_path` (`str`): The path component of the URL.

**Returns**

- `str`: The extracted filename.
"""
filename = None
if content_disposition:
self.logger.debug("Parsing Content-Disposition header for filename.")
parts = content_disposition.split(';')
for part in parts:
part = part.strip()
if part.lower().startswith('filename*='):
# Handle RFC 5987 encoding (e.g., filename*=UTF-8''example.txt)
encoded_filename = part.split('=', 1)[1].strip()
if "''" in encoded_filename:
filename = encoded_filename.split("''", 1)[1]
else:
filename = encoded_filename
self.logger.debug(f"Filename extracted from headers (RFC 5987): {filename}")
break
elif part.lower().startswith('filename='):
# Remove 'filename=' and any surrounding quotes
filename = part.split('=', 1)[1].strip(' "')
self.logger.debug(f"Filename extracted from headers: {filename}")
break

if not filename:
self.logger.debug("Falling back to filename derived from URL path.")
filename = os.path.basename(url_path)
if not filename:
filename = 'downloaded_file'
self.logger.debug(f"Filename derived from URL: {filename}")

return filename


42 changes: 27 additions & 15 deletions src/snipe/api/snipe_sig.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import heapq
import logging

import sourmash.save_load
from snipe.api.enums import SigType
from typing import Any, Dict, Iterator, List, Optional, Union
from typing import Dict, Iterator, List, Union, Optional
import numpy as np
import sourmash
import os
Expand All @@ -18,8 +17,9 @@ class SnipeSig:
such as customized set operations and abundance management.
"""

def __init__(self, *, sourmash_sig: Union[str, sourmash.signature.SourmashSignature, sourmash.signature.FrozenSourmashSignature],
ksize: int = 51, scale: int = 10000, sig_type=SigType.SAMPLE, enable_logging: bool = False, **kwargs):
def __init__(self, *,
sourmash_sig: Union[str, sourmash.signature.SourmashSignature, sourmash.signature.FrozenSourmashSignature],
sig_type=SigType.SAMPLE, enable_logging: bool = False, **kwargs):
r"""
Initialize the SnipeSig with a sourmash signature object or a path to a signature.

Expand Down Expand Up @@ -54,15 +54,15 @@ def __init__(self, *, sourmash_sig: Union[str, sourmash.signature.SourmashSignat
# Initialize internal variables
self.logger.debug("Initializing SnipeSig with sourmash_sig: %s", sourmash_sig)

self._scale = scale
self._ksize = ksize
self._md5sum = None
self._scale: int = None
self._ksize: int = None
self._md5sum: str = None
self._hashes = np.array([], dtype=np.uint64)
self._abundances = np.array([], dtype=np.uint32)
self._type = sig_type
self._name = None
self._filename = None
self._track_abundance = False
self._type: SigType = sig_type
self._name: str = None
self._filename: str = None
self._track_abundance: bool = True

sourmash_sigs: Dict[str, sourmash.signature.SourmashSignature] = {}
_sourmash_sig: Union[sourmash.signature.SourmashSignature, sourmash.signature.FrozenSourmashSignature] = None
Expand Down Expand Up @@ -117,18 +117,19 @@ def __init__(self, *, sourmash_sig: Union[str, sourmash.signature.SourmashSignat
self.logger.debug(f"Iterating over signature: {signame}")
if signame.endswith("-snipegenome"):
sig = sig.to_mutable()
# self.chr_to_sig[sig.name] = SnipeSig(sourmash_sig=sig, sig_type=SigType.GENOME, enable_logging=enable_logging)
sig.name = sig.name.replace("-snipegenome", "")
self.logger.debug("Found a genome signature with the snipe suffix `-snipegenome`. Restoring original name `%s`.", sig.name)
_sourmash_sig = sig
elif signame.startswith("sex-"):
self.logger.debug("Found a sex chr signature %s", signame)
sig = sig.to_mutable()
sig.name = signame.replace("sex-","")
# sig.name = signame.replace("sex-","")
self.chr_to_sig[sig.name] = SnipeSig(sourmash_sig=sig, sig_type=SigType.AMPLICON, enable_logging=enable_logging)
elif signame.startswith("autosome-"):
self.logger.debug("Found an autosome signature %s", signame)
sig = sig.to_mutable()
sig.name = signame.replace("autosome-","")
# sig.name = signame.replace("autosome-","")
self.chr_to_sig[sig.name] = SnipeSig(sourmash_sig=sig, sig_type=SigType.AMPLICON, enable_logging=enable_logging)
else:
continue
Expand Down Expand Up @@ -281,6 +282,13 @@ def sigtype(self, sigtype: SigType):
Set the type of the signature.
"""
self._type = sigtype

@track_abundance.setter
def track_abundance(self, track_abundance: bool):
r"""
Set whether the signature tracks abundance.
"""
self._track_abundance = track_abundance

def get_info(self) -> dict:
r"""
Expand Down Expand Up @@ -490,7 +498,10 @@ def _convert_to_sourmash_signature(self):
self.logger.debug("Converting SnipeSig to sourmash.signature.SourmashSignature.")

mh = sourmash.minhash.MinHash(n=0, ksize=self._ksize, scaled=self._scale, track_abundance=self._track_abundance)
mh.set_abundances(dict(zip(self._hashes, self._abundances)))
if self._track_abundance:
mh.set_abundances(dict(zip(self._hashes, self._abundances)))
else:
mh.add_many(self._hashes)
self.sourmash_sig = sourmash.signature.SourmashSignature(mh, name=self._name, filename=self._filename)
self.logger.debug("Conversion to sourmash.signature.SourmashSignature completed.")

Expand All @@ -516,7 +527,7 @@ def export(self, path, force=False) -> None:
with sourmash.save_load.SaveSignatures_ZipFile(path) as save_sigs:
save_sigs.add(self.sourmash_sig)
except Exception as e:
logging.error("Failed to export signatures to zip: %s", e)
self.logger.error("Failed to export signatures to zip: %s", e)
raise Exception(f"Failed to export signatures to zip: {e}") from e
else:
raise ValueError("Output file must be either a .sig or .zip file.")
Expand Down Expand Up @@ -1273,6 +1284,7 @@ def reset_abundance(self, new_abundance: int = 1):
self._validate_abundance_operation(new_abundance, "reset abundance")

self._abundances[:] = new_abundance
self.track_abundance = True
self.logger.debug("Reset all abundances to %d.", new_abundance)

def keep_min_abundance(self, min_abundance: int):
Expand Down
Loading
Loading