Skip to content

Commit

Permalink
Merge pull request #15 from akikuno:develop-v0.6.0
Browse files Browse the repository at this point in the history
Develop-v0.6.0
  • Loading branch information
akikuno authored Sep 2, 2023
2 parents cfcaced + 68c29ff commit ca67532
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 36 deletions.
39 changes: 28 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
- `cstag.mask()`: Mask low-quality bases in a CS tag
- `cstag.split()`: Split a CS tag
- `cstag.revcomp()`: Converts a CS tag into its reverse complement.
- `cstag.to_html()`: Output html report
- `cstag.to_vcf()`: Output VCF
- `cstag.to_html()`: Output HTML

Visit the [documentation](https://akikuno.github.io/cstag/cstag/) for more details.

Expand Down Expand Up @@ -46,11 +47,11 @@ cigar = "8M2D4M2I3N1M"
md = "2A5^AG7"
seq = "ACGTACGTACGTACG"

cstag.call(cigar, md, seq)
# => :2*ag:5-ag:4+ac~nn3nn:1
print(cstag.call(cigar, md, seq))
# :2*ag:5-ag:4+ac~nn3nn:1

cstag.call(cigar, md, seq, long=True)
# => =AC*ag=TACGT-ag=ACGT+ac~nn3nn=G
# =AC*ag=TACGT-ag=ACGT+ac~nn3nn=G
```

### Shorten or Lengthen CS Tags
Expand All @@ -62,7 +63,7 @@ import cstag
cs = "=ACGT*ag=CGT"

cstag.shorten(cs)
# => :4*ag:3
# :4*ag:3


# Convert a CS tag from short to long
Expand All @@ -71,7 +72,7 @@ cigar = "8M"
seq = "ACGTACGT"

cstag.lengthen(cs, cigar, seq)
# => =ACGT*ag=CGT
# =ACGT*ag=CGT
```

### Generate a Consensus
Expand All @@ -84,7 +85,7 @@ cigar_list = ["4M", "4M", "1S3M", "3M", "3M3I1M"]
pos_list = [1, 1, 1, 2, 1]

cstag.consensus(cs_list, cigar_list, pos_list)
# => =AC*gt*T
# =AC*gt*T
```

### Mask Low-Quality Bases
Expand All @@ -97,7 +98,7 @@ cigar = "5M2I2D1M"
qual = "AA!!!!AA"
phred_threshold = 10
cstag.mask(cs, cigar, qual, phred_threshold)
# => =ACNN*an+ng-cc=T
# =ACNN*an+ng-cc=T
```

### Split a CS Tag
Expand All @@ -107,7 +108,7 @@ import cstag

cs = "=ACGT*ac+gg-cc=T"
cstag.split(cs)
# => ['=ACGT', '*ac', '+gg', '-cc', '=T']
# ['=ACGT', '*ac', '+gg', '-cc', '=T']
```

### Reverse Complement of a CS Tag
Expand All @@ -117,9 +118,25 @@ import cstag

cs = "=ACGT*ac+gg-cc=T"
cstag.revcomp(cs)
# => =A-gg+cc*tg=ACGT
# =A-gg+cc*tg=ACGT
```

### Generate VCF Report

```python
import cstag
cs_tag = "=AC*gt=T-gg=C+tt=A"
chrom = "chr1"
pos = 1
print(cstag.to_vcf(cstag, chrom, pos))
"""
##fileformat=VCFv4.2
#CHROM POS ID REF ALT QUAL FILTER INFO
chr1 3 . G T . . .
chr1 4 . TGG T . . .
chr1 5 . C CTT . . .
"""
```

### Generate HTML Report

Expand All @@ -132,7 +149,7 @@ description = "Example"

cs_tag_html = cstag.to_html(cs_tag, description)
Path("report.html").write_text(cs_tag_html)
# => Output "report.html"
# Output "report.html"
```
The resulting `report.html` looks like this :point_down:

Expand Down
29 changes: 12 additions & 17 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[project]
[tool.poetry]
name = "cstag"
version = "0.5.1"
version = "0.6.0"
description = "Python module to manipulate the minimap2's CS tag"
authors = [{ name = "Akihiro Kuno", email = "[email protected]" }]
requires-python = ">=3.7"
readme = { file = "README.md", content-type = "text/markdown" }
license = { file = "LICENSE" }

authors = ["Akihiro Kuno <[email protected]>"]
homepage = "https://github.com/akikuno/cstag"
repository = "https://github.com/akikuno/cstag"
documentation = "https://akikuno.github.io/cstag/cstag"
readme = "README.md"
license = "MIT"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
Expand All @@ -19,11 +20,5 @@ classifiers = [
"Topic :: Scientific/Engineering :: Bio-Informatics",
]

[project.urls]
homepage = "https://github.com/akikuno/cstag"
repository = "https://github.com/akikuno/cstag"
documentation = "https://akikuno.github.io/cstag/cstag"


[tool.setuptools]
packages.find = { where = ["src"] }
[tool.poetry.dependencies]
python = "^3.7"
1 change: 1 addition & 0 deletions src/cstag/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
from cstag.split import split
from cstag.revcomp import revcomp
from cstag.to_html import to_html
from cstag.to_vcf import to_vcf
34 changes: 26 additions & 8 deletions src/cstag/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,30 @@ def normalize_read_lengths(cs_list: list[deque[str]], starts: list[int]) -> list
return cs_list


def preprocess_for_consensus(cs_tags: list[str], cigars: list[str], positions: list[int]) -> list[deque[str]]:
"""
Preprocess cs_tags, cigars, and positions for consensus generation.
Args:
cs_tags (list[str]): List of CS tags in the long format.
cigars (list[str]): List of cigars strings.
positions (list[int]): List of 1-based leftmost mapping positions.
Returns:
list: A tuple containing a list of deques representing the normalized reads
"""
# Step 1: Calculate the starts positions for each read
starts = calculate_read_starts(positions, cigars)

# Step 2: Split and process each CS tag
cs_tags_normalized_length = split_cs_tags(cs_tags)

# Step 3: Normalize the lengths of each read
cs_tags_normalized_length = normalize_read_lengths(cs_tags_normalized_length, starts)

return cs_tags_normalized_length


def get_consensus(cs_list: list[deque[str]]) -> str:
cs_consensus = []
for cs in zip(*cs_list):
Expand Down Expand Up @@ -149,14 +173,8 @@ def consensus(cs_tags: list[str], cigars: list[str], positions: list[int], prefi
for cs_tag in cs_tags:
validate_long_format(cs_tag)

# Calculate the starts positions for each read
starts = calculate_read_starts(positions, cigars)

cs_list = split_cs_tags(cs_tags)

# Normalize the lengths of each read
cs_list = normalize_read_lengths(cs_list, starts)
cs_tags_normalized_length = preprocess_for_consensus(cs_tags, cigars, positions)

cs_consensus = get_consensus(cs_list)
cs_consensus = get_consensus(cs_tags_normalized_length)

return f"cs:Z:{cs_consensus}" if prefix else cs_consensus
102 changes: 102 additions & 0 deletions src/cstag/to_vcf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from __future__ import annotations

from cstag.split import split
from collections import deque

from cstag.utils.validator import validate_long_format


def find_ref_for_insertion(cs_tag_split: list[str], idx: int) -> str | None:
idx_ref = idx - 1
while idx_ref >= 0:
cs = cs_tag_split[idx_ref]
if cs[0] in ["=", "-"]:
return cs[-1].upper()
if cs.startswith("*"):
return cs[1].upper()
idx_ref -= 1
return None


def find_ref_for_deletion(cs_tag_split: list[str], idx: int) -> str:
ref = deque([cs_tag_split[idx][1:].upper()])
idx_ref = idx - 1
while idx_ref >= 0:
cs = cs_tag_split[idx_ref]
if cs.startswith("="):
ref.appendleft(cs[-1].upper())
break
if cs.startswith("*"):
ref.appendleft(cs[1].upper())
break
idx_ref -= 1
return "".join(ref)


def get_variant_annotations(cs_tag_split: list[str], position: int) -> list[tuple[int, str, str]]:
variant_annotations = []
pos = position
for idx, cs in enumerate(cs_tag_split):
if cs.startswith("="):
pos += len(cs) - 1
elif cs.startswith("*"):
ref, alt = cs[1].upper(), cs[2].upper()
variant_annotations.append((pos, ref, alt))
pos += 1
elif cs.startswith("+"):
ref = find_ref_for_insertion(cs_tag_split, idx)
alt = ref + cs[1:].upper()
variant_annotations.append((pos - 1, ref, alt))
elif cs.startswith("-"):
ref = find_ref_for_deletion(cs_tag_split, idx)
variant_annotations.append((pos - 1, ref, ref[0]))
elif cs.startswith("~"):
continue

return variant_annotations


###########################################################
# main
###########################################################


def to_vcf(cs_tag: str, chrom: str, pos: int) -> str:
"""
Convert a CS tag to VCF (Variant Call Format) string.
Args:
cs_tag (str): The CS tag representing the sequence alignment.
chrom (str): The chromosome name.
pos (int): The starting position for the sequence.
Returns:
str: The VCF-formatted string.
Example:
>>> import cstag
>>> cs_tag = "=AC*gt=T-gg=C+tt=A"
>>> chrom = "chr1"
>>> pos = 1
>>> print(cstag.to_vcf(cstag, chrom, pos))
##fileformat=VCFv4.2
#CHROM POS ID REF ALT QUAL FILTER INFO
chr1 3 . G T . . .
chr1 4 . TGG T . . .
chr1 5 . C CTT . . .
"""

validate_long_format(cs_tag)

cs_tag_split = split(cs_tag)

# Call POS, REF, ALT
variants = get_variant_annotations(cs_tag_split, pos)

# Write VCF
HEADER = "##fileformat=VCFv4.2\n#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n"

vcf = HEADER.strip().split("\n")
for pos, ref, alt in variants:
vcf.append(f"{chrom}\t{pos}\t.\t{ref}\t{alt}\t.\t.\t.")

return "\n".join(vcf)
56 changes: 56 additions & 0 deletions tests/test_to_vcf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from src.cstag.to_vcf import find_ref_for_insertion, find_ref_for_deletion, get_variant_annotations, to_vcf


# find_ref_for_insertionのテスト
def test_find_ref_for_insertion():
assert find_ref_for_insertion(["=ACGT", "*ga", "+a"], 0) is None
assert find_ref_for_insertion(["=ACGT", "*ga", "+a"], 1) == "T"
assert find_ref_for_insertion(["=ACGT", "*ga", "+a"], 2) == "G"
assert find_ref_for_insertion(["=AC", "=GT", "-g", "+a"], 3) == "G"


# find_ref_for_deletionのテスト
def test_find_ref_for_deletion():
assert find_ref_for_deletion(["=AC", "-g"], 1) == "CG"
assert find_ref_for_deletion(["=ACGT", "*ga", "-a"], 2) == "GA"
assert find_ref_for_deletion(["=ACGT", "*ga", "-ac"], 2) == "GAC"
assert find_ref_for_deletion(["=AC", "=GT", "+a", "-a"], 3) == "TA"


# get_variant_annotationsのテスト
def test_get_variant_annotations():
# single mutation
assert get_variant_annotations(["=AC", "*ga", "=AC"], 1) == [(3, "G", "A")]
assert get_variant_annotations(["=AC", "+a", "=AC"], 1) == [(2, "C", "CA")]
assert get_variant_annotations(["=AC", "-a", "=AC"], 1) == [(2, "CA", "C")]
# double mutations
assert get_variant_annotations(["=AC", "*ga", "=AC", "*ct"], 1) == [(3, "G", "A"), (6, "C", "T")]
assert get_variant_annotations(["=AC", "+a", "=AC", "+aa"], 1) == [(2, "C", "CA"), (4, "C", "CAA")]
assert get_variant_annotations(["=AC", "-a", "=AC", "-aa"], 1) == [(2, "CA", "C"), (4, "CAA", "C")]
# combinations
assert get_variant_annotations(["=ACGT", "*ga", "+a"], 1) == [(5, "G", "A"), (5, "G", "GA")]
assert get_variant_annotations(["=ACGT", "*ga", "-a"], 1) == [(5, "G", "A"), (5, "GA", "G")]
assert get_variant_annotations(["=ACGT", "*ga", "-ac"], 1) == [(5, "G", "A"), (5, "GAC", "G")]
# position
assert get_variant_annotations(["=AC", "*ga", "=AC", "*ct"], 10) == [(12, "G", "A"), (15, "C", "T")]


# to_vcf関数のテスト
def test_to_vcf():
cs_tag1 = "=AC*gt=T-gg=C+tt=A"
chrom1 = "chr1"
pos1 = 1
expected_output1 = """##fileformat=VCFv4.2
#CHROM POS ID REF ALT QUAL FILTER INFO
chr1 3 . G T . . .
chr1 4 . TGG T . . .
chr1 5 . C CTT . . ."""
assert to_vcf(cs_tag1, chrom1, pos1) == expected_output1

cs_tag2 = "=AC*ga"
chrom2 = "2"
pos2 = 1
expected_output2 = """##fileformat=VCFv4.2
#CHROM POS ID REF ALT QUAL FILTER INFO
2 3 . G A . . ."""
assert to_vcf(cs_tag2, chrom2, pos2) == expected_output2

0 comments on commit ca67532

Please sign in to comment.