Skip to content

Commit

Permalink
Merge pull request #18 from snipe-bio/ops_guided_merge
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-eyes authored Oct 20, 2024
2 parents 76333dd + d426e1b commit 1bdc32b
Showing 1 changed file with 206 additions and 3 deletions.
209 changes: 206 additions & 3 deletions src/snipe/cli/cli_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
import logging
from typing import List

from collections import defaultdict
import click

from snipe.api.enums import SigType
Expand Down Expand Up @@ -411,7 +411,7 @@ def sum(ctx, sig_files, sigs_from_file, reset_abundance, trim_singletons,
def intersect(ctx, sig_files, sigs_from_file, reset_abundance, trim_singletons,
min_abund, max_abund, trim_below_median, output_file, name, debug, force):
"""
Compute the intersection of multiple signatures, retaining only common hashes.
Intersect multiple sigs and retain abundance of first one.
This command identifies hashes that are present in **all** input signatures and
retains their abundance from the first signature.
Expand Down Expand Up @@ -649,7 +649,7 @@ def subtract(ctx, sig_files, sigs_from_file, reset_abundance, trim_singletons,
def union(ctx, sig_files, sigs_from_file, reset_abundance, trim_singletons,
min_abund, max_abund, trim_below_median, output_file, name, debug, force):
"""
Merge multiple signatures by taking the union of their hashes.
Merge multiple signatures by taking the union of hashes.
This command combines multiple signatures, retaining all unique hashes from each.
If a hash appears in multiple signatures, its abundance in the resulting signature
Expand Down Expand Up @@ -868,3 +868,206 @@ def common(ctx, sig_files, sigs_from_file, reset_abundance, trim_singletons,
logger.error(f"Failed to export common hashes signature: {e}")
click.echo(f"Error: Failed to export common hashes signature: {e}", err=True)
sys.exit(1)

import csv

@ops.command()
@click.option('--table', '-t', type=click.Path(exists=True, dir_okay=False), required=True, help='Tabular file (CSV or TSV) with two columns: <signature_path> <experiment_name>')
@click.option('--output-dir', '-d', type=click.Path(file_okay=False), required=True, help='Directory to save merged signature files.')
@click.option('--reset-abundance', is_flag=True, default=False, help='Reset abundance for all input signatures to 1.')
@click.option('--trim-singletons', is_flag=True, default=False, help='Trim singletons from all input signatures.')
@click.option('--min-abund', type=int, help='Keep hashes with abundance >= this value.')
@click.option('--max-abund', type=int, help='Keep hashes with abundance <= this value.')
@click.option('--trim-below-median', is_flag=True, default=False, help='Trim hashes below the median abundance.')
@click.option('--debug', is_flag=True, default=False, help='Enable debugging and detailed logging.')
@click.option('--force', is_flag=True, default=False, help='Overwrite existing files in the output directory.')
@click.pass_context
def guided_merge(ctx, table, output_dir, reset_abundance, trim_singletons,
min_abund, max_abund, trim_below_median, debug, force):
"""
Guide signature merging by groups.
This command reads a table file (CSV or TSV) where each line contains a signature file path and an experiment name.
It groups signatures by experiment, applies specified operations, sums the signatures within each group,
and saves the merged signatures as `{output_dir}/{experiment_name}.zip`.
**Example:**
snipe ops guided_merge --table mapping.tsv --output-dir merged_sigs --reset-abundance --force
**Example Table File (`mapping.tsv`):**
/path/to/sig1.zip exp1
/path/to/sig2.zip exp1
/path/to/sig3.zip exp2
/path/to/sig4.zip exp2
/path/to/sig5.zip exp3
"""
# Setup logging
logger = logging.getLogger('ops.guided_merge')
if debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.CRITICAL)

# Function to detect delimiter
def detect_delimiter(file_path):
with open(file_path, 'r', newline='') as csvfile:
sample = csvfile.read(1024)
sniffer = csv.Sniffer()
try:
dialect = sniffer.sniff(sample, delimiters='\t,')
delimiter = dialect.delimiter
logger.debug(f"Detected delimiter: '{delimiter}'")
return delimiter
except csv.Error:
logger.warning("Could not detect delimiter. Defaulting to tab.")
return '\t'

# Initialize counters and mapping
total_mapped = 0
total_valid = 0
total_invalid = 0
experiment_mapping = defaultdict(list)
invalid_files = []

# Detect delimiter
delimiter = detect_delimiter(table)

# Parse the table file
try:
with open(table, 'r', newline='') as tbl_file:
reader = csv.reader(tbl_file, delimiter=delimiter)
for line_num, row in enumerate(reader, start=1):
if not row:
logger.debug(f"Line {line_num}: Empty line. Skipping.")
continue # Skip empty lines
if len(row) < 2:
logger.warning(f"Line {line_num}: Invalid format. Expected 2 columns, got {len(row)}. Skipping.")
total_invalid += 1
continue
sig_path, exp_name = row[0].strip(), row[1].strip()
if not sig_path or not exp_name:
logger.warning(f"Line {line_num}: Missing signature path or experiment name. Skipping.")
total_invalid += 1
continue
total_mapped += 1
if not os.path.isfile(sig_path):
logger.warning(f"Line {line_num}: Signature file does not exist: {sig_path}. Skipping.")
invalid_files.append(sig_path)
total_invalid += 1
continue
experiment_mapping[exp_name].append(sig_path)
total_valid += 1
except Exception as e:
logger.error(f"Failed to read table file {table}: {e}")
click.echo(f"Error: Failed to read table file {table}: {e}", err=True)
sys.exit(1)

if total_valid == 0:
logger.error("No valid signature files found in the table. Exiting.")
click.echo("Error: No valid signature files found in the table. Exiting.", err=True)
sys.exit(1)

logger.debug(f"Total lines in table: {total_mapped + total_invalid}")
logger.debug(f"Total valid signatures: {total_valid}")
logger.debug(f"Total invalid signatures: {total_invalid}")
logger.debug(f"Experiments found: {len(experiment_mapping)}")

# Create the output directory if it does not exist
if not os.path.exists(output_dir):
try:
os.makedirs(output_dir)
logger.debug(f"Created output directory: {output_dir}")
except Exception as e:
logger.error(f"Failed to create output directory {output_dir}: {e}")
click.echo(f"Error: Failed to create output directory {output_dir}: {e}", err=True)
sys.exit(1)
else:
if not os.path.isdir(output_dir):
logger.error(f"Output path {output_dir} exists and is not a directory.")
click.echo(f"Error: Output path {output_dir} exists and is not a directory.", err=True)
sys.exit(1)

# Process each experiment
summary = []
for exp_name, sig_paths in experiment_mapping.items():
logger.debug(f"Processing experiment '{exp_name}' with {len(sig_paths)} signatures.")
# Load signatures
signatures = load_signatures(sig_paths, logger, allow_duplicates=False)
if not signatures:
logger.warning(f"No valid signatures loaded for experiment '{exp_name}'. Skipping.")
continue

# Define operation context
operations = []
if reset_abundance:
operations.append(('reset_abundance', None))
if trim_singletons:
operations.append(('trim_singletons', None))
if min_abund is not None:
operations.append(('keep_min_abundance', min_abund))
if max_abund is not None:
operations.append(('keep_max_abundance', max_abund))
if trim_below_median:
operations.append(('trim_below_median', None))

logger.debug(f"Operations to apply for experiment '{exp_name}': {operations}")

# Apply operations
apply_operations(signatures, operations, logger)

# Sum signatures
try:
merged_signature = SnipeSig.sum_signatures(
signatures,
name=exp_name,
filename=None,
enable_logging=debug
)
logger.debug(f"Merged signature created for experiment '{exp_name}' with name '{merged_signature.name}'.")
except Exception as e:
logger.error(f"Failed to merge signatures for experiment '{exp_name}': {e}")
click.echo(f"Error: Failed to merge signatures for experiment '{exp_name}': {e}", err=True)
continue

# Define output file path
output_file_path = os.path.join(output_dir, f"{exp_name}.zip")

# Check if output file exists
if os.path.exists(output_file_path) and not force:
logger.error(f"Output file '{output_file_path}' already exists. Use --force to overwrite.")
click.echo(f"Error: Output file '{output_file_path}' already exists. Use --force to overwrite.", err=True)
continue

# Export the merged signature
try:
merged_signature.export(output_file_path)
click.echo(f"Merged signature for experiment '{exp_name}' exported to {output_file_path}")
logger.info(f"Merged signature for experiment '{exp_name}' exported to {output_file_path}")
summary.append((exp_name, len(sig_paths), output_file_path))
except FileExistsError:
logger.error(f"Output file '{output_file_path}' already exists. Use --force to overwrite.")
click.echo(f"Error: Output file '{output_file_path}' already exists. Use --force to overwrite.", err=True)
continue
except Exception as e:
logger.error(f"Failed to export merged signature for experiment '{exp_name}': {e}")
click.echo(f"Error: Failed to export merged signature for experiment '{exp_name}': {e}", err=True)
continue

# Summary Report
click.echo("\nGuided Merge Summary:")
click.echo(f"Total signatures mapped in table: {total_mapped}")
click.echo(f"Valid signatures processed: {total_valid}")
click.echo(f"Invalid or missing signatures skipped: {total_invalid}")
if invalid_files:
click.echo("List of invalid or missing signature files:")
for invalid in invalid_files:
click.echo(f" - {invalid}")
click.echo(f"Total experiments merged: {len(summary)}")
for exp_name, count, path in summary:
click.echo(f" - Experiment '{exp_name}': {count} signatures merged into {path}")

if len(summary) < len(experiment_mapping):
skipped = len(experiment_mapping) - len(summary)
click.echo(f"Skipped {skipped} experiments due to errors or missing signatures.")

0 comments on commit 1bdc32b

Please sign in to comment.