Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separating qc from parallel qc and enhancing processing #15

Merged
merged 13 commits into from
Oct 16, 2024
Prev Previous commit
Next Next commit
little enhancement
  • Loading branch information
mr-eyes committed Oct 15, 2024
commit 60f6139112c57fc9315b1f5e0023842d40780a70
68 changes: 29 additions & 39 deletions src/snipe/cli/cli_qc.py
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@

import signal


def validate_sig_input(ctx, param, value: tuple) -> str:
supported_extensions = ['.zip', '.sig']
for path in value:
@@ -24,10 +25,10 @@ def validate_sig_input(ctx, param, value: tuple) -> str:
if not any(path.lower().endswith(ext) for ext in supported_extensions):
raise click.BadParameter(f"Unsupported file format: {path}, supported formats: {', '.join(supported_extensions)}")

def process_sample(sample_path: str, ref_path: str, amplicon_path: Optional[str],
def process_sample(sample_path: str, reference_sig: SnipeSig, amplicon_sig: Optional[SnipeSig],
advanced: bool, roi: bool, debug: bool,
ychr: Optional[str] = None,
vars_paths: Optional[Union[List[str], List[SnipeSig]]] = None) -> Dict[str, Any]:
ychr_sig: Optional[SnipeSig] = None,
var_sigs: Optional[List[SnipeSig]] = None) -> Dict[str, Any]:
"""
Process a single sample for QC.

@@ -59,16 +60,6 @@ def process_sample(sample_path: str, ref_path: str, amplicon_path: Optional[str]
sample_sig = SnipeSig(sourmash_sig=sample_path, sig_type=SigType.SAMPLE, enable_logging=debug)
logger.debug(f"Loaded sample signature: {sample_sig.name}")

# Load reference signature
reference_sig = SnipeSig(sourmash_sig=ref_path, sig_type=SigType.GENOME, enable_logging=debug)
logger.debug(f"Loaded reference signature: {reference_sig.name}")

# Load amplicon signature if provided
amplicon_sig = None
if amplicon_path:
amplicon_sig = SnipeSig(sourmash_sig=amplicon_path, sig_type=SigType.AMPLICON, enable_logging=debug)
logger.debug(f"Loaded amplicon signature: {amplicon_sig.name}")

# Instantiate ReferenceQC
qc_instance = ReferenceQC(
sample_sig=sample_sig,
@@ -78,28 +69,17 @@ def process_sample(sample_path: str, ref_path: str, amplicon_path: Optional[str]
)

# Load variable signatures if provided
if vars_paths:
qc_instance.logger.debug(f"Loading {len(vars_paths)} variable signature(s).")
vars_dict: Dict[str, SnipeSig] = {sig.name: sig for sig in vars_paths} if isinstance(vars_paths[0], SnipeSig) else {}
if var_sigs:
qc_instance.logger.debug(f"Loading {len(var_sigs)} variable signature(s).")
vars_dict: Dict[str, SnipeSig] = {sig.name: sig for sig in var_sigs}
qc_instance.logger.debug(f"vars_dict: {vars_dict}")
qc_instance.logger.debug(f"vars_paths: {vars_paths}")
vars_order: List[str] = []

if not vars_dict:
qc_instance.logger.debug("Loading variable signature(s) from file paths.")
for path in vars_paths:
qc_instance.logger.debug(f"Loading variable signature from: {path}")
var_sig = SnipeSig(sourmash_sig=path, sig_type=SigType.AMPLICON, enable_logging=debug)
var_name = var_sig.name if var_sig.name else os.path.basename(path)
vars_order.append(var_name)
vars_dict[var_name] = var_sig

qc_instance.logger.debug("Loading variable signature(s) from SnipeSig objects.")
for snipe_name in vars_dict.keys():
vars_order.append(snipe_name)
qc_instance.logger.debug(f"Loaded variable signature: {snipe_name}")

else:
qc_instance.logger.debug("Loading variable signature(s) from SnipeSig objects.")
for snipe_name in vars_dict.keys():
vars_order.append(snipe_name)
qc_instance.logger.debug(f"Loaded variable signature: {snipe_name}")

# log keys of vars_dict and vars_order
qc_instance.logger.debug(f"vars_dict keys: {vars_dict.keys()}")
qc_instance.logger.debug(f"vars_order: {vars_order}")
@@ -111,8 +91,7 @@ def process_sample(sample_path: str, ref_path: str, amplicon_path: Optional[str]
# Calculate chromosome metrics
# genome_chr_to_sig: Dict[str, SnipeSig] = qc_instance.load_genome_sig_to_dict(zip_file_path = ref_path)
chr_to_sig = reference_sig.chr_to_sig.copy()
if ychr:
ychr_sig = SnipeSig(sourmash_sig=ychr, sig_type=SigType.GENOME, enable_logging=debug)
if ychr_sig:
chr_to_sig['y'] = ychr_sig

qc_instance.calculate_chromosome_metrics(chr_to_sig)
@@ -492,6 +471,17 @@ def qc(ref: str, sample: List[str], samples_from_file: Optional[str],
except Exception as e:
logger.error(f"Failed to load amplicon signature from {amplicon}: {e}")
sys.exit(1)

# Load Y chromosome signature if provided
ychr_sig = None
if ychr:
logger.info(f"Loading Y chromosome signature from: {ychr}")
try:
ychr_sig = SnipeSig(sourmash_sig=ychr, sig_type=SigType.GENOME, enable_logging=debug)
logger.debug(f"Loaded Y chromosome signature: {ychr_sig.name}")
except Exception as e:
logger.error(f"Failed to load Y chromosome signature from {ychr}: {e}")
sys.exit(1)

# Prepare variable signatures if provided
vars_paths = []
@@ -517,15 +507,15 @@ def qc(ref: str, sample: List[str], samples_from_file: Optional[str],
for sample_path in valid_samples:
dict_process_args.append({
"sample_path": sample_path,
"ref_path": ref,
"amplicon_path": amplicon,
"reference_sig": reference_sig,
"amplicon_sig": amplicon_sig,
"advanced": advanced,
"roi": roi,
"debug": debug,
"ychr": ychr,
"vars_paths": vars_snipesigs #vars_paths
"ychr_sig": ychr_sig,
"var_sigs": vars_snipesigs #vars_paths
})

results = []

# Define a handler for graceful shutdown