diff --git a/README.md b/README.md index 7233a39..66bb078 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,8 @@ - `cstag.mask()`: Mask low-quality bases in a CS tag - `cstag.split()`: Split a CS tag - `cstag.revcomp()`: Converts a CS tag into its reverse complement. -- `cstag.to_html()`: Output html report +- `cstag.to_vcf()`: Output VCF +- `cstag.to_html()`: Output HTML Visit the [documentation](https://akikuno.github.io/cstag/cstag/) for more details. @@ -46,11 +47,11 @@ cigar = "8M2D4M2I3N1M" md = "2A5^AG7" seq = "ACGTACGTACGTACG" -cstag.call(cigar, md, seq) -# => :2*ag:5-ag:4+ac~nn3nn:1 +print(cstag.call(cigar, md, seq)) +# :2*ag:5-ag:4+ac~nn3nn:1 cstag.call(cigar, md, seq, long=True) -# => =AC*ag=TACGT-ag=ACGT+ac~nn3nn=G +# =AC*ag=TACGT-ag=ACGT+ac~nn3nn=G ``` ### Shorten or Lengthen CS Tags @@ -62,7 +63,7 @@ import cstag cs = "=ACGT*ag=CGT" cstag.shorten(cs) -# => :4*ag:3 +# :4*ag:3 # Convert a CS tag from short to long @@ -71,7 +72,7 @@ cigar = "8M" seq = "ACGTACGT" cstag.lengthen(cs, cigar, seq) -# => =ACGT*ag=CGT +# =ACGT*ag=CGT ``` ### Generate a Consensus @@ -84,7 +85,7 @@ cigar_list = ["4M", "4M", "1S3M", "3M", "3M3I1M"] pos_list = [1, 1, 1, 2, 1] cstag.consensus(cs_list, cigar_list, pos_list) -# => =AC*gt*T +# =AC*gt*T ``` ### Mask Low-Quality Bases @@ -97,7 +98,7 @@ cigar = "5M2I2D1M" qual = "AA!!!!AA" phred_threshold = 10 cstag.mask(cs, cigar, qual, phred_threshold) -# => =ACNN*an+ng-cc=T +# =ACNN*an+ng-cc=T ``` ### Split a CS Tag @@ -107,7 +108,7 @@ import cstag cs = "=ACGT*ac+gg-cc=T" cstag.split(cs) -# => ['=ACGT', '*ac', '+gg', '-cc', '=T'] +# ['=ACGT', '*ac', '+gg', '-cc', '=T'] ``` ### Reverse Complement of a CS Tag @@ -117,9 +118,25 @@ import cstag cs = "=ACGT*ac+gg-cc=T" cstag.revcomp(cs) -# => =A-gg+cc*tg=ACGT +# =A-gg+cc*tg=ACGT ``` +### Generate VCF Report + +```python +import cstag +cs_tag = "=AC*gt=T-gg=C+tt=A" +chrom = "chr1" +pos = 1 +print(cstag.to_vcf(cstag, chrom, pos)) +""" +##fileformat=VCFv4.2 +#CHROM POS ID REF ALT QUAL FILTER INFO +chr1 3 . G T . . . +chr1 4 . TGG T . . . +chr1 5 . C CTT . . . +""" +``` ### Generate HTML Report @@ -132,7 +149,7 @@ description = "Example" cs_tag_html = cstag.to_html(cs_tag, description) Path("report.html").write_text(cs_tag_html) -# => Output "report.html" +# Output "report.html" ``` The resulting `report.html` looks like this :point_down: diff --git a/pyproject.toml b/pyproject.toml index d3dd9ab..597be2d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,16 +1,17 @@ [build-system] -requires = ["setuptools", "wheel"] -build-backend = "setuptools.build_meta" +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" -[project] +[tool.poetry] name = "cstag" -version = "0.5.1" +version = "0.6.0" description = "Python module to manipulate the minimap2's CS tag" -authors = [{ name = "Akihiro Kuno", email = "akuno@md.tsukuba.ac.jp" }] -requires-python = ">=3.7" -readme = { file = "README.md", content-type = "text/markdown" } -license = { file = "LICENSE" } - +authors = ["Akihiro Kuno "] +homepage = "https://github.com/akikuno/cstag" +repository = "https://github.com/akikuno/cstag" +documentation = "https://akikuno.github.io/cstag/cstag" +readme = "README.md" +license = "MIT" classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", @@ -19,11 +20,5 @@ classifiers = [ "Topic :: Scientific/Engineering :: Bio-Informatics", ] -[project.urls] -homepage = "https://github.com/akikuno/cstag" -repository = "https://github.com/akikuno/cstag" -documentation = "https://akikuno.github.io/cstag/cstag" - - -[tool.setuptools] -packages.find = { where = ["src"] } +[tool.poetry.dependencies] +python = "^3.7" diff --git a/src/cstag/__init__.py b/src/cstag/__init__.py index 3cbb323..79600a4 100644 --- a/src/cstag/__init__.py +++ b/src/cstag/__init__.py @@ -10,3 +10,4 @@ from cstag.split import split from cstag.revcomp import revcomp from cstag.to_html import to_html +from cstag.to_vcf import to_vcf diff --git a/src/cstag/consensus.py b/src/cstag/consensus.py index 5fc73eb..60ce23d 100644 --- a/src/cstag/consensus.py +++ b/src/cstag/consensus.py @@ -100,6 +100,30 @@ def normalize_read_lengths(cs_list: list[deque[str]], starts: list[int]) -> list return cs_list +def preprocess_for_consensus(cs_tags: list[str], cigars: list[str], positions: list[int]) -> list[deque[str]]: + """ + Preprocess cs_tags, cigars, and positions for consensus generation. + + Args: + cs_tags (list[str]): List of CS tags in the long format. + cigars (list[str]): List of cigars strings. + positions (list[int]): List of 1-based leftmost mapping positions. + + Returns: + list: A tuple containing a list of deques representing the normalized reads + """ + # Step 1: Calculate the starts positions for each read + starts = calculate_read_starts(positions, cigars) + + # Step 2: Split and process each CS tag + cs_tags_normalized_length = split_cs_tags(cs_tags) + + # Step 3: Normalize the lengths of each read + cs_tags_normalized_length = normalize_read_lengths(cs_tags_normalized_length, starts) + + return cs_tags_normalized_length + + def get_consensus(cs_list: list[deque[str]]) -> str: cs_consensus = [] for cs in zip(*cs_list): @@ -149,14 +173,8 @@ def consensus(cs_tags: list[str], cigars: list[str], positions: list[int], prefi for cs_tag in cs_tags: validate_long_format(cs_tag) - # Calculate the starts positions for each read - starts = calculate_read_starts(positions, cigars) - - cs_list = split_cs_tags(cs_tags) - - # Normalize the lengths of each read - cs_list = normalize_read_lengths(cs_list, starts) + cs_tags_normalized_length = preprocess_for_consensus(cs_tags, cigars, positions) - cs_consensus = get_consensus(cs_list) + cs_consensus = get_consensus(cs_tags_normalized_length) return f"cs:Z:{cs_consensus}" if prefix else cs_consensus diff --git a/src/cstag/to_vcf.py b/src/cstag/to_vcf.py new file mode 100644 index 0000000..a7b4483 --- /dev/null +++ b/src/cstag/to_vcf.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +from cstag.split import split +from collections import deque + +from cstag.utils.validator import validate_long_format + + +def find_ref_for_insertion(cs_tag_split: list[str], idx: int) -> str | None: + idx_ref = idx - 1 + while idx_ref >= 0: + cs = cs_tag_split[idx_ref] + if cs[0] in ["=", "-"]: + return cs[-1].upper() + if cs.startswith("*"): + return cs[1].upper() + idx_ref -= 1 + return None + + +def find_ref_for_deletion(cs_tag_split: list[str], idx: int) -> str: + ref = deque([cs_tag_split[idx][1:].upper()]) + idx_ref = idx - 1 + while idx_ref >= 0: + cs = cs_tag_split[idx_ref] + if cs.startswith("="): + ref.appendleft(cs[-1].upper()) + break + if cs.startswith("*"): + ref.appendleft(cs[1].upper()) + break + idx_ref -= 1 + return "".join(ref) + + +def get_variant_annotations(cs_tag_split: list[str], position: int) -> list[tuple[int, str, str]]: + variant_annotations = [] + pos = position + for idx, cs in enumerate(cs_tag_split): + if cs.startswith("="): + pos += len(cs) - 1 + elif cs.startswith("*"): + ref, alt = cs[1].upper(), cs[2].upper() + variant_annotations.append((pos, ref, alt)) + pos += 1 + elif cs.startswith("+"): + ref = find_ref_for_insertion(cs_tag_split, idx) + alt = ref + cs[1:].upper() + variant_annotations.append((pos - 1, ref, alt)) + elif cs.startswith("-"): + ref = find_ref_for_deletion(cs_tag_split, idx) + variant_annotations.append((pos - 1, ref, ref[0])) + elif cs.startswith("~"): + continue + + return variant_annotations + + +########################################################### +# main +########################################################### + + +def to_vcf(cs_tag: str, chrom: str, pos: int) -> str: + """ + Convert a CS tag to VCF (Variant Call Format) string. + + Args: + cs_tag (str): The CS tag representing the sequence alignment. + chrom (str): The chromosome name. + pos (int): The starting position for the sequence. + + Returns: + str: The VCF-formatted string. + Example: + >>> import cstag + >>> cs_tag = "=AC*gt=T-gg=C+tt=A" + >>> chrom = "chr1" + >>> pos = 1 + >>> print(cstag.to_vcf(cstag, chrom, pos)) + ##fileformat=VCFv4.2 + #CHROM POS ID REF ALT QUAL FILTER INFO + chr1 3 . G T . . . + chr1 4 . TGG T . . . + chr1 5 . C CTT . . . + """ + + validate_long_format(cs_tag) + + cs_tag_split = split(cs_tag) + + # Call POS, REF, ALT + variants = get_variant_annotations(cs_tag_split, pos) + + # Write VCF + HEADER = "##fileformat=VCFv4.2\n#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n" + + vcf = HEADER.strip().split("\n") + for pos, ref, alt in variants: + vcf.append(f"{chrom}\t{pos}\t.\t{ref}\t{alt}\t.\t.\t.") + + return "\n".join(vcf) diff --git a/tests/test_to_vcf.py b/tests/test_to_vcf.py new file mode 100644 index 0000000..5785f06 --- /dev/null +++ b/tests/test_to_vcf.py @@ -0,0 +1,56 @@ +from src.cstag.to_vcf import find_ref_for_insertion, find_ref_for_deletion, get_variant_annotations, to_vcf + + +# find_ref_for_insertionのテスト +def test_find_ref_for_insertion(): + assert find_ref_for_insertion(["=ACGT", "*ga", "+a"], 0) is None + assert find_ref_for_insertion(["=ACGT", "*ga", "+a"], 1) == "T" + assert find_ref_for_insertion(["=ACGT", "*ga", "+a"], 2) == "G" + assert find_ref_for_insertion(["=AC", "=GT", "-g", "+a"], 3) == "G" + + +# find_ref_for_deletionのテスト +def test_find_ref_for_deletion(): + assert find_ref_for_deletion(["=AC", "-g"], 1) == "CG" + assert find_ref_for_deletion(["=ACGT", "*ga", "-a"], 2) == "GA" + assert find_ref_for_deletion(["=ACGT", "*ga", "-ac"], 2) == "GAC" + assert find_ref_for_deletion(["=AC", "=GT", "+a", "-a"], 3) == "TA" + + +# get_variant_annotationsのテスト +def test_get_variant_annotations(): + # single mutation + assert get_variant_annotations(["=AC", "*ga", "=AC"], 1) == [(3, "G", "A")] + assert get_variant_annotations(["=AC", "+a", "=AC"], 1) == [(2, "C", "CA")] + assert get_variant_annotations(["=AC", "-a", "=AC"], 1) == [(2, "CA", "C")] + # double mutations + assert get_variant_annotations(["=AC", "*ga", "=AC", "*ct"], 1) == [(3, "G", "A"), (6, "C", "T")] + assert get_variant_annotations(["=AC", "+a", "=AC", "+aa"], 1) == [(2, "C", "CA"), (4, "C", "CAA")] + assert get_variant_annotations(["=AC", "-a", "=AC", "-aa"], 1) == [(2, "CA", "C"), (4, "CAA", "C")] + # combinations + assert get_variant_annotations(["=ACGT", "*ga", "+a"], 1) == [(5, "G", "A"), (5, "G", "GA")] + assert get_variant_annotations(["=ACGT", "*ga", "-a"], 1) == [(5, "G", "A"), (5, "GA", "G")] + assert get_variant_annotations(["=ACGT", "*ga", "-ac"], 1) == [(5, "G", "A"), (5, "GAC", "G")] + # position + assert get_variant_annotations(["=AC", "*ga", "=AC", "*ct"], 10) == [(12, "G", "A"), (15, "C", "T")] + + +# to_vcf関数のテスト +def test_to_vcf(): + cs_tag1 = "=AC*gt=T-gg=C+tt=A" + chrom1 = "chr1" + pos1 = 1 + expected_output1 = """##fileformat=VCFv4.2 +#CHROM POS ID REF ALT QUAL FILTER INFO +chr1 3 . G T . . . +chr1 4 . TGG T . . . +chr1 5 . C CTT . . .""" + assert to_vcf(cs_tag1, chrom1, pos1) == expected_output1 + + cs_tag2 = "=AC*ga" + chrom2 = "2" + pos2 = 1 + expected_output2 = """##fileformat=VCFv4.2 +#CHROM POS ID REF ALT QUAL FILTER INFO +2 3 . G A . . .""" + assert to_vcf(cs_tag2, chrom2, pos2) == expected_output2