diff --git a/src/snipe/cli/cli_ops.py b/src/snipe/cli/cli_ops.py index 2d55242..81caf5a 100644 --- a/src/snipe/cli/cli_ops.py +++ b/src/snipe/cli/cli_ops.py @@ -4,7 +4,7 @@ import sys import logging from typing import List - +from collections import defaultdict import click from snipe.api.enums import SigType @@ -411,7 +411,7 @@ def sum(ctx, sig_files, sigs_from_file, reset_abundance, trim_singletons, def intersect(ctx, sig_files, sigs_from_file, reset_abundance, trim_singletons, min_abund, max_abund, trim_below_median, output_file, name, debug, force): """ - Compute the intersection of multiple signatures, retaining only common hashes. + Intersect multiple sigs and retain abundance of first one. This command identifies hashes that are present in **all** input signatures and retains their abundance from the first signature. @@ -649,7 +649,7 @@ def subtract(ctx, sig_files, sigs_from_file, reset_abundance, trim_singletons, def union(ctx, sig_files, sigs_from_file, reset_abundance, trim_singletons, min_abund, max_abund, trim_below_median, output_file, name, debug, force): """ - Merge multiple signatures by taking the union of their hashes. + Merge multiple signatures by taking the union of hashes. This command combines multiple signatures, retaining all unique hashes from each. If a hash appears in multiple signatures, its abundance in the resulting signature @@ -868,3 +868,206 @@ def common(ctx, sig_files, sigs_from_file, reset_abundance, trim_singletons, logger.error(f"Failed to export common hashes signature: {e}") click.echo(f"Error: Failed to export common hashes signature: {e}", err=True) sys.exit(1) + +import csv + +@ops.command() +@click.option('--table', '-t', type=click.Path(exists=True, dir_okay=False), required=True, help='Tabular file (CSV or TSV) with two columns: ') +@click.option('--output-dir', '-d', type=click.Path(file_okay=False), required=True, help='Directory to save merged signature files.') +@click.option('--reset-abundance', is_flag=True, default=False, help='Reset abundance for all input signatures to 1.') +@click.option('--trim-singletons', is_flag=True, default=False, help='Trim singletons from all input signatures.') +@click.option('--min-abund', type=int, help='Keep hashes with abundance >= this value.') +@click.option('--max-abund', type=int, help='Keep hashes with abundance <= this value.') +@click.option('--trim-below-median', is_flag=True, default=False, help='Trim hashes below the median abundance.') +@click.option('--debug', is_flag=True, default=False, help='Enable debugging and detailed logging.') +@click.option('--force', is_flag=True, default=False, help='Overwrite existing files in the output directory.') +@click.pass_context +def guided_merge(ctx, table, output_dir, reset_abundance, trim_singletons, + min_abund, max_abund, trim_below_median, debug, force): + """ + Guide signature merging by groups. + + This command reads a table file (CSV or TSV) where each line contains a signature file path and an experiment name. + It groups signatures by experiment, applies specified operations, sums the signatures within each group, + and saves the merged signatures as `{output_dir}/{experiment_name}.zip`. + + **Example:** + + snipe ops guided_merge --table mapping.tsv --output-dir merged_sigs --reset-abundance --force + + **Example Table File (`mapping.tsv`):** + + /path/to/sig1.zip exp1 + /path/to/sig2.zip exp1 + /path/to/sig3.zip exp2 + /path/to/sig4.zip exp2 + /path/to/sig5.zip exp3 + """ + # Setup logging + logger = logging.getLogger('ops.guided_merge') + if debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.CRITICAL) + + # Function to detect delimiter + def detect_delimiter(file_path): + with open(file_path, 'r', newline='') as csvfile: + sample = csvfile.read(1024) + sniffer = csv.Sniffer() + try: + dialect = sniffer.sniff(sample, delimiters='\t,') + delimiter = dialect.delimiter + logger.debug(f"Detected delimiter: '{delimiter}'") + return delimiter + except csv.Error: + logger.warning("Could not detect delimiter. Defaulting to tab.") + return '\t' + + # Initialize counters and mapping + total_mapped = 0 + total_valid = 0 + total_invalid = 0 + experiment_mapping = defaultdict(list) + invalid_files = [] + + # Detect delimiter + delimiter = detect_delimiter(table) + + # Parse the table file + try: + with open(table, 'r', newline='') as tbl_file: + reader = csv.reader(tbl_file, delimiter=delimiter) + for line_num, row in enumerate(reader, start=1): + if not row: + logger.debug(f"Line {line_num}: Empty line. Skipping.") + continue # Skip empty lines + if len(row) < 2: + logger.warning(f"Line {line_num}: Invalid format. Expected 2 columns, got {len(row)}. Skipping.") + total_invalid += 1 + continue + sig_path, exp_name = row[0].strip(), row[1].strip() + if not sig_path or not exp_name: + logger.warning(f"Line {line_num}: Missing signature path or experiment name. Skipping.") + total_invalid += 1 + continue + total_mapped += 1 + if not os.path.isfile(sig_path): + logger.warning(f"Line {line_num}: Signature file does not exist: {sig_path}. Skipping.") + invalid_files.append(sig_path) + total_invalid += 1 + continue + experiment_mapping[exp_name].append(sig_path) + total_valid += 1 + except Exception as e: + logger.error(f"Failed to read table file {table}: {e}") + click.echo(f"Error: Failed to read table file {table}: {e}", err=True) + sys.exit(1) + + if total_valid == 0: + logger.error("No valid signature files found in the table. Exiting.") + click.echo("Error: No valid signature files found in the table. Exiting.", err=True) + sys.exit(1) + + logger.debug(f"Total lines in table: {total_mapped + total_invalid}") + logger.debug(f"Total valid signatures: {total_valid}") + logger.debug(f"Total invalid signatures: {total_invalid}") + logger.debug(f"Experiments found: {len(experiment_mapping)}") + + # Create the output directory if it does not exist + if not os.path.exists(output_dir): + try: + os.makedirs(output_dir) + logger.debug(f"Created output directory: {output_dir}") + except Exception as e: + logger.error(f"Failed to create output directory {output_dir}: {e}") + click.echo(f"Error: Failed to create output directory {output_dir}: {e}", err=True) + sys.exit(1) + else: + if not os.path.isdir(output_dir): + logger.error(f"Output path {output_dir} exists and is not a directory.") + click.echo(f"Error: Output path {output_dir} exists and is not a directory.", err=True) + sys.exit(1) + + # Process each experiment + summary = [] + for exp_name, sig_paths in experiment_mapping.items(): + logger.debug(f"Processing experiment '{exp_name}' with {len(sig_paths)} signatures.") + # Load signatures + signatures = load_signatures(sig_paths, logger, allow_duplicates=False) + if not signatures: + logger.warning(f"No valid signatures loaded for experiment '{exp_name}'. Skipping.") + continue + + # Define operation context + operations = [] + if reset_abundance: + operations.append(('reset_abundance', None)) + if trim_singletons: + operations.append(('trim_singletons', None)) + if min_abund is not None: + operations.append(('keep_min_abundance', min_abund)) + if max_abund is not None: + operations.append(('keep_max_abundance', max_abund)) + if trim_below_median: + operations.append(('trim_below_median', None)) + + logger.debug(f"Operations to apply for experiment '{exp_name}': {operations}") + + # Apply operations + apply_operations(signatures, operations, logger) + + # Sum signatures + try: + merged_signature = SnipeSig.sum_signatures( + signatures, + name=exp_name, + filename=None, + enable_logging=debug + ) + logger.debug(f"Merged signature created for experiment '{exp_name}' with name '{merged_signature.name}'.") + except Exception as e: + logger.error(f"Failed to merge signatures for experiment '{exp_name}': {e}") + click.echo(f"Error: Failed to merge signatures for experiment '{exp_name}': {e}", err=True) + continue + + # Define output file path + output_file_path = os.path.join(output_dir, f"{exp_name}.zip") + + # Check if output file exists + if os.path.exists(output_file_path) and not force: + logger.error(f"Output file '{output_file_path}' already exists. Use --force to overwrite.") + click.echo(f"Error: Output file '{output_file_path}' already exists. Use --force to overwrite.", err=True) + continue + + # Export the merged signature + try: + merged_signature.export(output_file_path) + click.echo(f"Merged signature for experiment '{exp_name}' exported to {output_file_path}") + logger.info(f"Merged signature for experiment '{exp_name}' exported to {output_file_path}") + summary.append((exp_name, len(sig_paths), output_file_path)) + except FileExistsError: + logger.error(f"Output file '{output_file_path}' already exists. Use --force to overwrite.") + click.echo(f"Error: Output file '{output_file_path}' already exists. Use --force to overwrite.", err=True) + continue + except Exception as e: + logger.error(f"Failed to export merged signature for experiment '{exp_name}': {e}") + click.echo(f"Error: Failed to export merged signature for experiment '{exp_name}': {e}", err=True) + continue + + # Summary Report + click.echo("\nGuided Merge Summary:") + click.echo(f"Total signatures mapped in table: {total_mapped}") + click.echo(f"Valid signatures processed: {total_valid}") + click.echo(f"Invalid or missing signatures skipped: {total_invalid}") + if invalid_files: + click.echo("List of invalid or missing signature files:") + for invalid in invalid_files: + click.echo(f" - {invalid}") + click.echo(f"Total experiments merged: {len(summary)}") + for exp_name, count, path in summary: + click.echo(f" - Experiment '{exp_name}': {count} signatures merged into {path}") + + if len(summary) < len(experiment_mapping): + skipped = len(experiment_mapping) - len(summary) + click.echo(f"Skipped {skipped} experiments due to errors or missing signatures.") \ No newline at end of file