diff --git a/BALSAMIC/assets/scripts/merge_snv_variantcallers.py b/BALSAMIC/assets/scripts/merge_snv_variantcallers.py index 0c1dcec32..869a165f2 100644 --- a/BALSAMIC/assets/scripts/merge_snv_variantcallers.py +++ b/BALSAMIC/assets/scripts/merge_snv_variantcallers.py @@ -1,22 +1,42 @@ #!/usr/bin/env python +from typing import Dict, List, Tuple, Optional +import click import gzip import sys import re -import click -def collect_header(vcf): +def collect_header(vcf: str) -> List[str]: + """ + Collects the header lines from a VCF file. + + Parameters: + - vcf (str): Path to the VCF file. + + Returns: + - List[str]: List of header lines. + """ header = [] with gzip.open(vcf, "rt") as gz_vcf: for line in gz_vcf: if line.startswith("#"): - header.append(line) + header.append(line.strip()) else: break return header def add_header_categories(header_categories, header): + """ + Categorizes VCF headers by type (e.g., FILTER, INFO). + + Parameters: + - header_categories (Dict[str, List[str]]): Existing header categories. + - header (List[str]): List of header lines. + + Returns: + - Dict[str, List[str]]: Updated header categories. + """ for hdr_row in header: hdr_row_split = hdr_row.split("=") cat = hdr_row_split[0].strip("#") @@ -28,33 +48,72 @@ def add_header_categories(header_categories, header): return header_categories -def get_description_field(header_row): - pattern = r'Description="(.*?)"' - match = re.search(pattern, header_row) - if match: - return match.group(1) - return None +def get_description_field(header_row: str) -> Optional[str]: + """ + Extracts the Description field from a VCF header row. + + Parameters: + - header_row (str): A header row. + + Returns: + - Optional[str]: The description field, if present. + """ + match = re.search(r'Description="(.*?)"', header_row) + return match.group(1) if match else None + + +def update_description_text(row: str, new_description: str) -> str: + """ + Updates the Description field in a VCF header row. + Parameters: + - row (str): A header row. + - new_description (str): The new description text. -def update_description_text(row, new_description): + Returns: + - str: Updated header row. + """ return re.sub(r'Description="(.*?)"', f'Description="{new_description}"', row) -def get_number_field(header_row): - pattern = r"Number=([^,]+)" - match = re.search(pattern, header_row) - if match: - return match.group(1) - return None +def get_number_field(header_row: str) -> Optional[str]: + """ + Extracts the Number field from a VCF header row. + + Parameters: + - header_row (str): A header row. + + Returns: + - Optional[str]: The Number field, if present. + """ + match = re.search(r"Number=([^,]+)", header_row) + return match.group(1) if match else None + +def update_number_text(row: str, new_number: str) -> str: + """ + Updates the Number field in a VCF header row. -def update_number_text(row, new_number): - return re.sub(r"Number=([^,]+)", f'Number="{new_number}"', row) + Parameters: + - row (str): A header row. + - new_number (str): The new Number field value. + + Returns: + - str: Updated header row. + """ + return re.sub(r"Number=([^,]+)", f"Number={new_number}", row) def merge_headers(vcf1, vcf2): """ - Combines all header lines from two VCF files, adding unique lines from each. + Merges headers from two VCF files, ensuring no duplicate or conflicting entries. + + Parameters: + - vcf1 (str): Path to the first VCF file. + - vcf2 (str): Path to the second VCF file. + + Returns: + - List[str]: Merged header lines. """ header1 = collect_header(vcf1) header2 = collect_header(vcf2) @@ -114,175 +173,146 @@ def merge_headers(vcf1, vcf2): return merged_header -def merge_header_row(line1, line2): - updated_line = line1 +def merge_header_row(line1: str, line2: str) -> str: + """ + Merges two header rows with the same ID, resolving conflicts in Description and Number fields. + + Parameters: + - line1 (str): First header row. + - line2 (str): Second header row. - # If Descriptions are different, merge them - description1 = get_description_field(line1) - description2 = get_description_field(line2) - if description1 and description2 and description1 != description2: - # Merge Descriptions - merged_description = f"vcf1: {description1} | vcf2: {description2}" - updated_line = update_description_text(line1, merged_description) - - # If Number is different, set to "." - number1 = get_number_field(line1) - number2 = get_number_field(line2) - if number1 and number2 and number1 != number2: - # Conflicting Number type, setting to "." - # Print warning to standard error + Returns: + - str: Merged header row. + """ + updated_line = line1 + desc1 = get_description_field(line1) + desc2 = get_description_field(line2) + if desc1 and desc2 and desc1 != desc2: + merged_desc = f"vcf1: {desc1} | vcf2: {desc2}" + updated_line = update_description_text(updated_line, merged_desc) + + num1 = get_number_field(line1) + num2 = get_number_field(line2) + if num1 and num2 and num1 != num2: + updated_line = update_number_text(updated_line, ".") print( - f"Warning: Number fields differ for header lines. line1: {line1}, line2: {line2}", + f"Warning: Number fields differ. line1: {line1}, line2: {line2}", file=sys.stderr, ) - updated_line = update_number_text(updated_line, ".") return updated_line -def read_variants(vcf): - variants = {} +def read_variants(vcf: str) -> Dict[Tuple[str, str, str, str], Dict[str, str]]: + """ + Reads variants from a VCF file, storing them in a dictionary keyed by variant identifiers. + Parameters: + - vcf (str): Path to the VCF file. + + Returns: + - Dict[Tuple[str, str, str, str], Dict[str, str]]: Dictionary of variant data. + """ + variants = {} with gzip.open(vcf, "rt") as gz_vcf: for line in gz_vcf: if line.startswith("#"): - continue # Skip header lines + continue fields = line.strip().split("\t") - - normal = False - if len(fields) > 10: # TUMOR + NORMAL - ( - chrom, - pos, - id, - ref, - alt, - qual, - filter, - info, - format, - tumor, - normal, - ) = fields[:11] - else: # TUMOR ONLY - chrom, pos, id, ref, alt, qual, filter, info, format, tumor = fields[ - :11 - ] - - # Use a tuple of (chrom, pos, ref, alt) as the key + chrom, pos, id, ref, alt, qual, filter_, info, format_, *samples = fields key = (chrom, pos, ref, alt) - - # Store the variant along with its ID, QUAL, and FILTER, and INFO field - variants[key] = {} - variants[key]["chrom"] = chrom - variants[key]["pos"] = pos - variants[key]["id"] = id - variants[key]["ref"] = ref - variants[key]["alt"] = alt - variants[key]["qual"] = qual - variants[key]["filter"] = filter - variants[key]["info"] = info - variants[key]["format"] = format - variants[key]["tumor"] = tumor - if normal: - variants[key]["normal"] = normal - + variants[key] = { + "chrom": chrom, + "pos": pos, + "id": id, + "ref": ref, + "alt": alt, + "qual": qual, + "filter": filter_, + "info": info, + "format": format_, + "samples": samples, + } return variants -def merge_info_fields(info_fields): +def merge_info_fields(info_fields: List[str]) -> str: """ - Merges INFO fields by combining all unique key-value pairs from a list of INFO fields. + Merges multiple INFO fields into a single field, preserving all unique key-value pairs. + + Parameters: + - info_fields (List[str]): List of INFO fields. + + Returns: + - str: Merged INFO field. """ merged_info = {} - merged_info_single_fields = {} - for info in info_fields: - info_key_value = info.split("=") - if len(info_key_value) < 2: - key = info_key_value[0] - merged_info_single_fields[key] = key - continue - - key = info_key_value[0] - value = info_key_value[1] - - if key in merged_info: - existing_val = merged_info[key] - merged_info[key] = ",".join([existing_val, value]) - else: - merged_info[key] = value - - # Recreate INFO field with merged key-value pairs - merged_info_str = ";".join([f"{key}" for key in merged_info_single_fields]) - merged_info_str = merged_info_str + ";".join( - [f"{key}={value}" for key, value in merged_info.items()] - ) - return merged_info_str + for field in info_fields: + key, sep, value = field.partition("=") + if sep: # Key-value pair + merged_info[key] = ( + merged_info.get(key, "") + ("," if key in merged_info else "") + value + ) + else: # Key only + merged_info[key] = None + return ";".join(f"{k}={v}" if v is not None else k for k, v in merged_info.items()) + + +def merge_variants(vcf1: str, vcf2: str) -> List[str]: + """ + Merges variants from two VCF files. + Parameters: + - vcf1 (str): Path to the first VCF file. + - vcf2 (str): Path to the second VCF file. -def merge_variants(vcf1, vcf2): + Returns: + - List[str]: Merged variant lines. """ - Merges variants from two VCF files based on the same chromosome, position, and alteration. - INFO fields are merged for variants with the same keys. ID, QUAL, and FILTER values - from vcf1 are prioritized if variants overlap. - """ - variants_vcf1 = read_variants(vcf1) - variants_vcf2 = read_variants(vcf2) + variants1 = read_variants(vcf1) + variants2 = read_variants(vcf2) - # Now, merge the variants with the same key merged_variants = [] - for variant in variants_vcf1: - v_dict = variants_vcf1[variant] - chrom = v_dict["chrom"] - pos = v_dict["pos"] - id = v_dict["id"] - ref = v_dict["ref"] - alt = v_dict["alt"] - qual = v_dict["qual"] - filter = v_dict["filter"] - info_1 = v_dict["info"].split(";") - format = v_dict["format"] - tumor = v_dict["tumor"] - if "normal" in v_dict: - normal = v_dict["normal"] - - if variant in variants_vcf2: - # Choose ID and merge INFO field - id_2 = variants_vcf2[variant]["id"] - info_2 = variants_vcf2[variant]["info"].split(";") - - if id != id_2 and id == ".": # Switch ID to ID from VCF2 - id = id_2 - - info_fields = [] - for info in info_1: - info_fields.append(info) - for info in info_2: - info_fields.append(info) - - merged_info = merge_info_fields(info_fields) - - # Combine the variant fields (ID, QUAL, FILTER prioritized from vcf1) - if "normal" in variants_vcf1[variant]: - merged_variant = f"{chrom}\t{pos}\t{id}\t{ref}\t{alt}\t{qual}\t{filter}\t{merged_info}\t{format}\t{tumor}\t{normal}" + for key, variant1 in variants1.items(): + variant2 = variants2.get(key) + if variant2: + id_ = variant1["id"] if variant1["id"] != "." else variant2["id"] + merged_info = merge_info_fields( + variant1["info"].split(";") + variant2["info"].split(";") + ) else: - merged_variant = f"{chrom}\t{pos}\t{id}\t{ref}\t{alt}\t{qual}\t{filter}\t{merged_info}\t{format}\t{tumor}" - merged_variants.append(merged_variant) - - return merged_variants + id_ = variant1["id"] + merged_info = variant1["info"] + merged_variants.append( + "\t".join( + [ + variant1["chrom"], + variant1["pos"], + id_, + variant1["ref"], + variant1["alt"], + variant1["qual"], + variant1["filter"], + merged_info, + variant1["format"], + ] + + variant1["samples"] + ) + ) -def write_merged_vcf(output_file, header, merged_variants): +def write_merged_vcf(output_file: str, header: List[str], variants: List[str]) -> None: """ - Writes the merged variants along with the header to the output VCF file. + Writes the merged VCF to a file. + + Parameters: + - output_file (str): Path to the output file. + - header (List[str]): Merged header lines. + - variants (List[str]): Merged variant lines. """ with open(output_file, "w") as f: - # Write the header - for line in header: - f.write(line + "\n") - # Write merged variants - for variant in merged_variants: - f.write(variant + "\n") + f.write("\n".join(header) + "\n") + f.write("\n".join(variants) + "\n") @click.command()