Skip to content

Commit

Permalink
Merge pull request #16 from akikuno:develop-v0.6.1
Browse files Browse the repository at this point in the history
bug fix: `cstag.consensus`
  • Loading branch information
akikuno authored Sep 3, 2023
2 parents ca67532 + ee6043c commit 59bf22e
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 125 deletions.
21 changes: 10 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ print(cstag.call(cigar, md, seq))
# :2*ag:5-ag:4+ac~nn3nn:1

cstag.call(cigar, md, seq, long=True)
# =AC*ag=TACGT-ag=ACGT+ac~nn3nn=G
# =AC*ag=TACGT-ag=ACGT+ac~nn3nn=G
```

### Shorten or Lengthen CS Tags
Expand All @@ -63,7 +63,7 @@ import cstag
cs = "=ACGT*ag=CGT"

cstag.shorten(cs)
# :4*ag:3
# :4*ag:3


# Convert a CS tag from short to long
Expand All @@ -72,20 +72,19 @@ cigar = "8M"
seq = "ACGTACGT"

cstag.lengthen(cs, cigar, seq)
# =ACGT*ag=CGT
# =ACGT*ag=CGT
```

### Generate a Consensus

```python
import cstag

cs_list = ["=ACGT", "=AC*gt=T", "=C*gt=T", "=C*gt=T", "=ACT+ccc=T"]
cigar_list = ["4M", "4M", "1S3M", "3M", "3M3I1M"]
pos_list = [1, 1, 1, 2, 1]
cs_tags = ["=ACGT", "=AC*gt=T", "=C*gt=T", "=C*gt=T", "=ACT+ccc=T"]
positions = [1, 1, 2, 2, 1]

cstag.consensus(cs_list, cigar_list, pos_list)
# =AC*gt*T
cstag.consensus(cs_tags, positions)
# =AC*gt*T
```

### Mask Low-Quality Bases
Expand All @@ -98,7 +97,7 @@ cigar = "5M2I2D1M"
qual = "AA!!!!AA"
phred_threshold = 10
cstag.mask(cs, cigar, qual, phred_threshold)
# =ACNN*an+ng-cc=T
# =ACNN*an+ng-cc=T
```

### Split a CS Tag
Expand All @@ -108,7 +107,7 @@ import cstag

cs = "=ACGT*ac+gg-cc=T"
cstag.split(cs)
# ['=ACGT', '*ac', '+gg', '-cc', '=T']
# ['=ACGT', '*ac', '+gg', '-cc', '=T']
```

### Reverse Complement of a CS Tag
Expand All @@ -118,7 +117,7 @@ import cstag

cs = "=ACGT*ac+gg-cc=T"
cstag.revcomp(cs)
# =A-gg+cc*tg=ACGT
# =A-gg+cc*tg=ACGT
```

### Generate VCF Report
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "cstag"
version = "0.6.0"
version = "0.6.1"
description = "Python module to manipulate the minimap2's CS tag"
authors = ["Akihiro Kuno <[email protected]>"]
homepage = "https://github.com/akikuno/cstag"
Expand Down
81 changes: 9 additions & 72 deletions src/cstag/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,6 @@
from cstag.utils.validator import validate_long_format


def extract_softclips(cigars: list[str]) -> list[int]:
"""
Extract the length of softclips from each cigars string.
Args:
cigars (list[str]): list of cigars strings.
Returns:
list[int]: list of softclip lengths for each cigars string.
"""
softclip_lengths = []
for cigar in cigars:
# Check if the cigars string starts with a softclip (e.g., "4S3M")
if re.match(r"^[0-9]+S", cigar):
# Extract the length of the softclip using regex and convert it to an integer
softclip_length = int(re.sub(r"^([0-9]+)S.*", r"\1", cigar))
else:
# No softclip, so the length is 0
softclip_length = 0
softclip_lengths.append(softclip_length)
return softclip_lengths


def calculate_read_starts(positions: list[int], cigars: list[str]) -> list[int]:
"""
Calculate the start positions of each read based on positions and cigars strings.
Args:
positions (list[int]): 1-based leftmost mapping positions.
cigars (list[str]): cigars strings indicating the mapping of each read.
Returns:
list[int]: Calculated start positions for each read.
"""
pos_min = min(positions)
pos_offsets = [pos - pos_min for pos in positions]
softclips = extract_softclips(cigars)
starts = [p + s for p, s in zip(pos_offsets, softclips)]
return starts


def split_cs_tags(cs_tags: list[str]) -> list[deque[str]]:
"""
Split and process each cs tag in cs_tags.
Expand Down Expand Up @@ -100,30 +59,6 @@ def normalize_read_lengths(cs_list: list[deque[str]], starts: list[int]) -> list
return cs_list


def preprocess_for_consensus(cs_tags: list[str], cigars: list[str], positions: list[int]) -> list[deque[str]]:
"""
Preprocess cs_tags, cigars, and positions for consensus generation.
Args:
cs_tags (list[str]): List of CS tags in the long format.
cigars (list[str]): List of cigars strings.
positions (list[int]): List of 1-based leftmost mapping positions.
Returns:
list: A tuple containing a list of deques representing the normalized reads
"""
# Step 1: Calculate the starts positions for each read
starts = calculate_read_starts(positions, cigars)

# Step 2: Split and process each CS tag
cs_tags_normalized_length = split_cs_tags(cs_tags)

# Step 3: Normalize the lengths of each read
cs_tags_normalized_length = normalize_read_lengths(cs_tags_normalized_length, starts)

return cs_tags_normalized_length


def get_consensus(cs_list: list[deque[str]]) -> str:
cs_consensus = []
for cs in zip(*cs_list):
Expand All @@ -150,30 +85,32 @@ def get_consensus(cs_list: list[deque[str]]) -> str:
###########################################################


def consensus(cs_tags: list[str], cigars: list[str], positions: list[int], prefix: bool = False) -> str:
def consensus(cs_tags: list[str], positions: list[int], prefix: bool = False) -> str:
"""generate consensus of cs tags
Args:
cs_tags (list): cs tags in the **long** format
cigars (list): cigars strings (6th column in SAM file)
positions (list): 1-based leftmost mapping position (4th column in SAM file)
prefix (bool, optional): Whether to add the prefix 'cs:Z:' to the cs tag. Defaults to False
Return:
str: a consensus of cs tag in the **long** format
Example:
>>> import cstag
>>> cs_tags = ["=ACGT", "=AC*gt=T", "=C*gt=T", "=C*gt=T", "=ACT+ccc=T"]
>>> cigars = ["4M","4M","1S3M", "3M", "3M3I1M"]
>>> positions = [6,6,6,7,6]
>>> cstag.consensus(cs_tags, cigars, positions)
>>> positions = [1,1,1,2,1]
>>> cstag.consensus(cs_tags, positions)
=AC*gt=T
"""
if not (len(cs_tags) == len(cigars) == len(positions) > 0):
if not (len(cs_tags) == len(positions) > 0):
raise ValueError("Element numbers of each argument must be the same")

for cs_tag in cs_tags:
validate_long_format(cs_tag)

cs_tags_normalized_length = preprocess_for_consensus(cs_tags, cigars, positions)
cs_tag_split = split_cs_tags(cs_tags)

positions_zero_indexed = [pos - 1 for pos in positions]

cs_tags_normalized_length = normalize_read_lengths(cs_tag_split, positions_zero_indexed)

cs_consensus = get_consensus(cs_tags_normalized_length)

Expand Down
52 changes: 11 additions & 41 deletions tests/test_consensus.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,12 @@
from collections import deque
from src.cstag.consensus import (
extract_softclips,
calculate_read_starts,
split_cs_tags,
normalize_read_lengths,
get_consensus,
consensus,
)


def test_extract_softclips():
# Test with a variety of CIGAR strings
cigar_strings = ["4S3M", "3M4S", "10M", "5S5M5S", "5M"]
expected_output = [4, 0, 0, 5, 0]

assert extract_softclips(cigar_strings) == expected_output

# Test with empty CIGAR string
assert extract_softclips([""]) == [0]

# Test with no CIGAR strings
assert extract_softclips([]) == []


def test_calculate_read_starts():
# Test with a variety of POS and CIGAR strings
POS = [6, 8, 10]
CIGAR = ["4S3M", "3M", "3S2M"]
expected_output = [0 + 4, 2 + 0, 4 + 3]

assert calculate_read_starts(POS, CIGAR) == expected_output


def test_split_cs_tags():
# Test with a variety of cs tags
cs_tags = ["=ACGT", "=AC*gt=T", "=C~gt10ag=T", "=ACT+ccc=T"]
Expand Down Expand Up @@ -96,30 +71,26 @@ def test_substitution():
"=C*gt=T",
"=ACT+ccc=T",
]
CIGAR = ["4M", "4M", "1S3M", "3M", "3M3I1M"]
POS = [1, 1, 1, 2, 1]
assert consensus(CSTAG, CIGAR, POS) == "=AC*gt=T"
POS = [1, 1, 2, 2, 1]
assert consensus(CSTAG, POS) == "=AC*gt=T"


def test_insertion():
CSTAG = ["=ACGT", "=AC+ggggg=GT", "=C+ggggg=GT", "=C+ggggg=GT"]
CIGAR = ["4M", "2M5I1M", "1S5I3M", "1M5I1M"]
POS = [1, 1, 1, 2]
assert consensus(CSTAG, CIGAR, POS) == "=NC+ggggg=GT"
POS = [1, 1, 2, 2]
assert consensus(CSTAG, POS) == "=NC+ggggg=GT"


def test_deletion():
CSTAG = ["=ACGT", "=AC-ggggg=GT", "=C-ggggg=GT", "=C-ggggg=GT"]
CIGAR = ["4M", "2M5D1M", "1S5D3M", "1M5D1M"]
POS = [1, 1, 1, 2]
assert consensus(CSTAG, CIGAR, POS) == "=NC-ggggg=GT"
POS = [1, 1, 2, 2]
assert consensus(CSTAG, POS) == "=NC-ggggg=GT"


def test_softclip():
def test_positions():
CSTAG = ["=ACGT", "=CGT", "=GT"]
CIGAR = ["4M", "1S3M", "2S2M"]
POS = [1, 1, 1]
assert consensus(CSTAG, CIGAR, POS) == "=NCGT"
POS = [1, 2, 3]
assert consensus(CSTAG, POS) == "=NCGT"


def test_splicing():
Expand All @@ -129,6 +100,5 @@ def test_splicing():
"=C~gc100ag=T",
"=C~gc100ag=T",
]
CIGAR = ["4M", "2M100N1M", "1S100N3M", "1M100N1M"]
POS = [1, 1, 1, 2]
assert consensus(CSTAG, CIGAR, POS) == "=NC~gc100ag=T"
POS = [1, 1, 2, 2]
assert consensus(CSTAG, POS) == "=NC~gc100ag=T"

0 comments on commit 59bf22e

Please sign in to comment.