Skip to content

Commit

Permalink
Merge pull request #19 from akikuno:develop-v1.0.1
Browse files Browse the repository at this point in the history
Develop-v1.0.1
  • Loading branch information
akikuno authored Oct 3, 2023
2 parents d0ba0e0 + 1bffb88 commit bac87ad
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "cstag"
version = "1.0.0"
version = "1.0.1"
description = "Python module to manipulate the minimap2's CS tag"
authors = ["Akihiro Kuno <[email protected]>"]
homepage = "https://github.com/akikuno/cstag"
Expand Down
53 changes: 45 additions & 8 deletions src/cstag/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@
from cstag.utils.validator import validate_cs_tag, validate_long_format


def split_deletion(cs_tag: str) -> list[str]:
match = re.match(r"-(?P<nucleotides>[acgtn]+)$", cs_tag)
if match:
nucleotides = match.group("nucleotides")
return [f"-{n}" for n in nucleotides]
return [cs_tag]


def expand_deletion_tags(tags_combined: list[str]) -> list[str]:
cs_tags_expand_deletion = []
for tag in tags_combined:
for tag_splitted in split_deletion(tag):
cs_tags_expand_deletion.append(tag_splitted)
return cs_tags_expand_deletion


def split_cs_tags(cs_tags: list[str]) -> list[list[str]]:
"""
Split and process each CS tag in cs_tags.
Expand All @@ -21,16 +37,20 @@ def split_cs_tags(cs_tags: list[str]) -> list[list[str]]:
for cs_tag in cs_tags:
# Remove the prefix "cs:Z:" if present
cs_tag = cs_tag.replace("cs:Z:", "")

# Split the CS tag using special symbols (-, *, ~, =)
split_tags = re.split(r"([-*~=])", cs_tag)[1:]
# insertion symbol (+) is ignored because it is not observed in reference sequence
tags_splitted = re.split(r"([-*~=])", cs_tag)[1:]
# Combine the symbol with the corresponding sequence
combined_tags = [symbol + seq for symbol, seq in zip(split_tags[0::2], split_tags[1::2])]
tags_combined = [symbol + seq for symbol, seq in zip(tags_splitted[0::2], tags_splitted[1::2])]
tags_combined = expand_deletion_tags(tags_combined)

# Remove the "=" symbols, as they are not needed for further processing
cleaned_tags = [tag.replace("=", "") for tag in combined_tags]
cleaned_tags = [tag.replace("=", "") for tag in tags_combined]
# Further split the tags by the base letters (A, C, G, T)
further_split_tags = [re.split(r"(?=[ACGT])", tag) for tag in cleaned_tags]
further_tags_splitted = [re.split(r"(?=[ACGT])", tag) for tag in cleaned_tags]
# Remove any empty strings generated by the split
non_empty_tags = [[elem for elem in tag if elem] for tag in further_split_tags]
non_empty_tags = [[elem for elem in tag if elem] for tag in further_tags_splitted]
# Flatten the list of lists into a single list
flat_tags = list(chain.from_iterable(non_empty_tags))
cs_tags_splitted.append(flat_tags)
Expand All @@ -47,7 +67,7 @@ def normalize_positions(positions: list[int]) -> list[int]:

def normalize_read_lengths(cs_tags: list[str], positions: list[int]) -> list[list[str]]:
"""
Normalize the lengths of each read in cs_tags based on their starts positions.
Normalize the lengths of each read in cs_tags based on their starts positions. If the length is insufficient, fill in with `None`.
Args:
cs_tags (list[str]): list of CS tags.
Expand All @@ -63,16 +83,32 @@ def normalize_read_lengths(cs_tags: list[str], positions: list[int]) -> list[lis

for i, pos in enumerate(positions_normalized):
if pos > 0:
cs_tags_deque[i].extendleft(["N"] * pos)
cs_tags_deque[i].extendleft([None] * pos)
if len(cs_tags_deque[i]) < cs_maxlen:
cs_tags_deque[i].extend(["N"] * (cs_maxlen - len(cs_tags_deque[i])))
cs_tags_deque[i].extend([None] * (cs_maxlen - len(cs_tags_deque[i])))
cs_tags = [list(cs) for cs in cs_tags_deque]
return cs_tags


def condense_deletions(s: str) -> str:
# Pattern for detecting continuous nucleotide deletions
pattern = r"(-[acgtn])+"

# Function to replace the matched pattern
def replacement(match) -> str:
# Remove hyphens and concatenate the nucleotides
condensed_nucleotides = match.group(0).replace("-", "")
return f"-{condensed_nucleotides}"

# Use the regular expression to substitute the pattern with its condensed form
return re.sub(pattern, replacement, s)


def get_consensus(cs_tags: list[list[str]]) -> str:
cs_consensus = []
for cs in zip(*cs_tags):
# Remove the None that is compensating for the insufficient lead length.
cs = [c for c in cs if c]
# Get the most common CS tag(s)
most_common_tags = Counter(cs).most_common()

Expand All @@ -87,6 +123,7 @@ def get_consensus(cs_tags: list[list[str]]) -> str:
cs_consensus.append(tag)

cs_consensus = "".join(cs_consensus)
cs_consensus = condense_deletions(cs_consensus)
# Append "=" to [ACGTN]
return re.sub(r"([ACGTN]+)", r"=\1", cs_consensus)

Expand Down
4 changes: 0 additions & 4 deletions src/cstag/to_html.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,18 @@
border: 0.1em solid;
background-color: #ee827c;
font-weight: bold;
# border-radius: 5px;
}
.Del {
color: #333;
border: 0.1em solid;
background-color: #a0d8ef;
font-weight: bold;
# border-radius: 5px;
}
.Sub {
color: #333;
border: 0.1em solid;
background-color: #98d98e;
font-weight: bold;
# border-radius: 5px;
}
.Splice {
color: #333;
Expand All @@ -60,7 +57,6 @@
border: 0.1em solid;
background-color: #c0c6c9;
font-weight: bold;
# border-radius: 5px;
}
</style>
Expand Down
18 changes: 9 additions & 9 deletions tests/test_consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ def test_normalize_read_lengths():
cs_tags = ["=ACGT", "=AC", "=GT"]
starts = [0, 2, 4]
expected_output = [
(["A", "C", "G", "T", "N", "N"]),
(["N", "N", "A", "C", "N", "N"]),
(["N", "N", "N", "N", "G", "T"]),
(["A", "C", "G", "T", None, None]),
([None, None, "A", "C", None, None]),
([None, None, None, None, "G", "T"]),
]

assert normalize_read_lengths(cs_tags, starts) == expected_output
Expand Down Expand Up @@ -76,15 +76,15 @@ def test_substitution():


def test_insertion():
CSTAG = ["=ACGT", "=AC+ggggg=GT", "=C+ggggg=GT", "=C+ggggg=GT"]
CSTAG = ["=ACGT", "=AC+acgt=GT", "=C+acgt=GT", "=C+acgt=GT"]
POS = [1, 1, 2, 2]
assert consensus(CSTAG, POS) == "=NC+ggggg=GT"
assert consensus(CSTAG, POS) == "=AC+acgt=GT"


def test_deletion():
CSTAG = ["=ACGT", "=AC-ggggg=GT", "=C-ggggg=GT", "=C-ggggg=GT"]
CSTAG = ["=ACGT", "=AC-acgt=GT", "=C-acgt=GT", "=C-acgt=GT"]
POS = [1, 1, 2, 2]
assert consensus(CSTAG, POS) == "=NC-ggggg=GT"
assert consensus(CSTAG, POS) == "=AC-acgt=GT"


def test_splicing():
Expand All @@ -95,13 +95,13 @@ def test_splicing():
"=C~gc100ag=T",
]
POS = [1, 1, 2, 2]
assert consensus(CSTAG, POS) == "=NC~gc100ag=T"
assert consensus(CSTAG, POS) == "=AC~gc100ag=T"


def test_positions():
CSTAG = ["=ACGT", "=CGT", "=GT"]
POS = [1, 2, 3]
assert consensus(CSTAG, POS) == "=NCGT"
assert consensus(CSTAG, POS) == "=ACGT"


def test_positions_more_than_one():
Expand Down

0 comments on commit bac87ad

Please sign in to comment.