-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #14 from akikuno:develop-v0.5.1
Develop-v0.5.1
- Loading branch information
Showing
20 changed files
with
510 additions
and
232 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,9 +4,10 @@ build-backend = "setuptools.build_meta" | |
|
||
[project] | ||
name = "cstag" | ||
version = "0.5.0" | ||
version = "0.5.1" | ||
description = "Python module to manipulate the minimap2's CS tag" | ||
authors = [{ name = "Akihiro Kuno", email = "[email protected]" }] | ||
requires-python = ">=3.7" | ||
readme = { file = "README.md", content-type = "text/markdown" } | ||
license = { file = "LICENSE" } | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,68 +1,162 @@ | ||
from __future__ import annotations | ||
|
||
import re | ||
from itertools import chain | ||
from collections import deque, Counter | ||
|
||
from cstag.utils.validator import validate_long_format | ||
|
||
|
||
def extract_softclips(cigars: list[str]) -> list[int]: | ||
""" | ||
Extract the length of softclips from each cigars string. | ||
def consensus(CSTAG: list, CIGAR: list, POS: list) -> str: | ||
"""generate consensus of cs tags | ||
Args: | ||
CSTAG (list): cs tags in the **long** format | ||
CIGAR (list): CIGAR strings (6th column in SAM file) | ||
POS (list): 1-based leftmost mapping position (4th column in SAM file) | ||
Return: | ||
str: a consensus of cs tag in the **long** format | ||
Example: | ||
>>> import cstag | ||
>>> cs_list = ["cs:Z:=ACGT", "cs:Z:=AC*gt=T", "cs:Z:=C*gt=T", "cs:Z:=C*gt=T", "cs:Z:=ACT+ccc=T"] | ||
>>> cigar_list = ["4M","4M","1S3M", "3M", "3M3I1M"] | ||
>>> pos_list = [6,6,6,7,6] | ||
>>> cstag.consensus(cs_list, cigar_list, pos) | ||
cs:Z:=AC*gt*T | ||
cigars (list[str]): list of cigars strings. | ||
Returns: | ||
list[int]: list of softclip lengths for each cigars string. | ||
""" | ||
if not (len(CSTAG) == len(CIGAR) == len(POS)): | ||
raise Exception("Error: Element numbers of each argument must be the same") | ||
softclip_lengths = [] | ||
for cigar in cigars: | ||
# Check if the cigars string starts with a softclip (e.g., "4S3M") | ||
if re.match(r"^[0-9]+S", cigar): | ||
# Extract the length of the softclip using regex and convert it to an integer | ||
softclip_length = int(re.sub(r"^([0-9]+)S.*", r"\1", cigar)) | ||
else: | ||
# No softclip, so the length is 0 | ||
softclip_length = 0 | ||
softclip_lengths.append(softclip_length) | ||
return softclip_lengths | ||
|
||
if not all(re.search(r"[ACGT]", cs) for cs in CSTAG): | ||
raise Exception("Error: cs tag must be a long format") | ||
|
||
pos_min = min(POS) | ||
pos = [pos - pos_min for pos in POS] | ||
def calculate_read_starts(positions: list[int], cigars: list[str]) -> list[int]: | ||
""" | ||
Calculate the start positions of each read based on positions and cigars strings. | ||
softclips = [re.sub(r"^([0-9]+)S.*", r"\1", cigar) for cigar in CIGAR] | ||
softclips = [int(s) if s.isdigit() else 0 for s in softclips] | ||
Args: | ||
positions (list[int]): 1-based leftmost mapping positions. | ||
cigars (list[str]): cigars strings indicating the mapping of each read. | ||
starts = [p + s for p, s in zip(pos, softclips)] | ||
Returns: | ||
list[int]: Calculated start positions for each read. | ||
""" | ||
pos_min = min(positions) | ||
pos_offsets = [pos - pos_min for pos in positions] | ||
softclips = extract_softclips(cigars) | ||
starts = [p + s for p, s in zip(pos_offsets, softclips)] | ||
return starts | ||
|
||
|
||
def split_cs_tags(cs_tags: list[str]) -> list[deque[str]]: | ||
""" | ||
Split and process each cs tag in cs_tags. | ||
Args: | ||
cs_tags (list[str]): list of cs tags in the long format. | ||
Returns: | ||
list[deque[str]]: list of processed cs tags as deque objects. | ||
""" | ||
cs_tags_splitted = [] | ||
for cs_tag in cs_tags: | ||
# Remove the prefix "cs:Z:" if present | ||
cs_tag = cs_tag.replace("cs:Z:", "") | ||
# Split the cs tag using special symbols (-, *, ~, =) | ||
split_tags = re.split(r"([-*~=])", cs_tag)[1:] | ||
# Combine the symbol with the corresponding sequence | ||
combined_tags = [symbol + seq for symbol, seq in zip(split_tags[0::2], split_tags[1::2])] | ||
# Remove the "=" symbols, as they are not needed for further processing | ||
cleaned_tags = [tag.replace("=", "") for tag in combined_tags] | ||
# Further split the tags by the base letters (A, C, G, T) | ||
further_split_tags = [re.split(r"(?=[ACGT])", tag) for tag in cleaned_tags] | ||
# Remove any empty strings generated by the split | ||
non_empty_tags = [[elem for elem in tag if elem] for tag in further_split_tags] | ||
# Flatten the list of lists into a single list | ||
flat_tags = list(chain.from_iterable(non_empty_tags)) | ||
cs_tags_splitted.append(deque(flat_tags)) | ||
return cs_tags_splitted | ||
|
||
|
||
def normalize_read_lengths(cs_list: list[deque[str]], starts: list[int]) -> list[deque[str]]: | ||
""" | ||
Normalize the lengths of each read in cs_list based on their starts positions. | ||
cs_list = [] | ||
for cs in CSTAG: | ||
cs = cs.replace("cs:Z:", "") | ||
cs = re.split(r"([-*~=])", cs)[1:] | ||
cs = [i + j for i, j in zip(cs[0::2], cs[1::2])] | ||
cs = [c.replace("=", "") for c in cs] | ||
cs = [re.split(r"(?=[ACGT])", c) for c in cs] | ||
cs = [list(filter(None, c)) for c in cs] | ||
cs = list(chain.from_iterable(cs)) | ||
cs_list.append(deque(cs)) | ||
Args: | ||
cs_list (list[deque[str]]): list of deques representing the reads. | ||
starts (list[int]): Starting positions of each read. | ||
Returns: | ||
list[deque[str]]: list of deques representing the reads, now normalized to the same length. | ||
""" | ||
cs_maxlen = max(len(cs) + start for cs, start in zip(cs_list, starts)) | ||
for i, (cs, start) in enumerate(zip(cs_list, starts)): | ||
if start: | ||
|
||
for i, start in enumerate(starts): | ||
if start > 0: | ||
cs_list[i].extendleft(["N"] * start) | ||
if len(cs_list[i]) < cs_maxlen: | ||
cs_list[i].extend(["N"] * (cs_maxlen - len(cs_list[i]))) | ||
|
||
def get_consensus(cs: tuple) -> str: | ||
""" | ||
When it is multimodal, return the first **mutated** mode encountered | ||
""" | ||
mostcommon = Counter(cs).most_common(1) | ||
if len(mostcommon) == 1: | ||
return mostcommon[0][0] | ||
for key, val in mostcommon: | ||
if not re.search(r"[ACGT]", key): | ||
return key | ||
|
||
cs_consensus = [get_consensus(cs) for cs in list(zip(*cs_list))] | ||
return cs_list | ||
|
||
|
||
def get_consensus(cs_list: list[deque[str]]) -> str: | ||
cs_consensus = [] | ||
for cs in zip(*cs_list): | ||
# Get the most common cs tag(s) | ||
most_common_tags = Counter(cs).most_common() | ||
|
||
# If there's a unique most common tag, return it | ||
most_common_tag, _ = most_common_tags[0] | ||
if len(most_common_tags) == 1 or most_common_tags[0][1] != most_common_tags[1][1]: | ||
cs_consensus.append(most_common_tag) | ||
continue | ||
# If the most common tag is not unique (multimodal), return the first *mutated* mode | ||
for tag, _ in most_common_tags: | ||
if not re.search(r"[ACGT]", tag): | ||
cs_consensus.append(tag) | ||
|
||
cs_consensus = "".join(cs_consensus) | ||
# Append "=" to [ACGTN] | ||
return re.sub(r"([ACGTN]+)", r"=\1", cs_consensus) | ||
|
||
|
||
########################################################### | ||
# main | ||
########################################################### | ||
|
||
|
||
def consensus(cs_tags: list[str], cigars: list[str], positions: list[int], prefix: bool = False) -> str: | ||
"""generate consensus of cs tags | ||
Args: | ||
cs_tags (list): cs tags in the **long** format | ||
cigars (list): cigars strings (6th column in SAM file) | ||
positions (list): 1-based leftmost mapping position (4th column in SAM file) | ||
prefix (bool, optional): Whether to add the prefix 'cs:Z:' to the cs tag. Defaults to False | ||
Return: | ||
str: a consensus of cs tag in the **long** format | ||
Example: | ||
>>> import cstag | ||
>>> cs_tags = ["=ACGT", "=AC*gt=T", "=C*gt=T", "=C*gt=T", "=ACT+ccc=T"] | ||
>>> cigars = ["4M","4M","1S3M", "3M", "3M3I1M"] | ||
>>> positions = [6,6,6,7,6] | ||
>>> cstag.consensus(cs_tags, cigars, positions) | ||
=AC*gt=T | ||
""" | ||
if not (len(cs_tags) == len(cigars) == len(positions) > 0): | ||
raise ValueError("Element numbers of each argument must be the same") | ||
|
||
for cs_tag in cs_tags: | ||
validate_long_format(cs_tag) | ||
|
||
# Calculate the starts positions for each read | ||
starts = calculate_read_starts(positions, cigars) | ||
|
||
cs_list = split_cs_tags(cs_tags) | ||
|
||
# Normalize the lengths of each read | ||
cs_list = normalize_read_lengths(cs_list, starts) | ||
|
||
cs_consensus = get_consensus(cs_list) | ||
|
||
return "cs:Z:" + re.sub(r"([ACGTN]+)", r"=\1", cs_consensus) | ||
return f"cs:Z:{cs_consensus}" if prefix else cs_consensus |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.