Skip to content

Commit

Permalink
Store latest changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jarbesfeld committed Jan 24, 2025
1 parent 4e00a59 commit a722022
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 171 deletions.
169 changes: 1 addition & 168 deletions src/fusor/fusion_caller_models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
"""Schemas for outputs provided by different fusion callers"""

import csv
from abc import ABC, abstractmethod
from abc import ABC
from enum import Enum
from pathlib import Path
from typing import Literal

from pydantic import BaseModel, ConfigDict, Field
Expand All @@ -29,46 +27,6 @@ class FusionCaller(ABC, BaseModel):
type: Caller
model_config = ConfigDict(extra="allow")

@staticmethod
def _does_file_exist(path: Path) -> None:
"""Check if fusions file exists
:param path: The path to the file
:return None
:raise ValueError if the file does not exist at the specified path
"""
if not path.exists():
statement = f"{path!s} does not exist"
raise ValueError(statement)
return

@classmethod
def _process_fusion_caller_rows(
cls,
path: Path,
column_rename: dict,
delimeter: str,
) -> list["FusionCaller"]:
"""Convert rows of fusion caller output to Pydantic classes
:param path: The path to the fusions file
:param column_rename: A dictionary of column mappings
:param delimeter: The delimeter for the fusions file
:return: A list of fusions, represented as Pydantic objects
"""
cls._does_file_exist(path)
fusions_list = []
with path.open() as csvfile:
reader = csv.DictReader(csvfile, delimiter=delimeter)
for row in reader:
row = {column_rename.get(key, key): value for key, value in row.items()}
fusions_list.append(cls(**row))
return fusions_list

@abstractmethod
def load_records(self, path: Path) -> list["FusionCaller"]:
"""Abstract method to load records from a fusion caller file."""


class JAFFA(FusionCaller):
"""Define parameters for JAFFA model"""
Expand Down Expand Up @@ -108,20 +66,6 @@ class JAFFA(FusionCaller):
description="The number of detected reads that align entirely on either side of the breakpoint",
)

@classmethod
def load_records(cls, path: Path) -> list["JAFFA"]:
"""Load fusions from JAFFA csv file
:param path: The path to the file of JAFFA fusions
:return A list of JAFFA objects, or None if the specified file does not exist
"""
column_rename = {
"fusion genes": "fusion_genes",
"spanning reads": "spanning_reads",
"spanning pairs": "spanning_pairs",
}
return cls._process_fusion_caller_rows(path, column_rename, ",")


class STARFusion(FusionCaller):
"""Define parameters for STAR-Fusion model"""
Expand All @@ -147,23 +91,6 @@ class STARFusion(FusionCaller):
description="The number of RNA-seq fragments that encompass the fusion junction such that one read of the pair aligns to a different gene than the other paired-end read of that fragment (from STAR-Fusion documentation)",
)

@classmethod
def load_records(cls, path: Path) -> list["STARFusion"]:
"""Load fusions from STAR-Fusion tsv file
:param path: The path to the file of STAR-Fusion fusions
:return A list of STAR-Fusion objects, or None if the specified file does not exist
"""
column_rename = {
"LeftGene": "left_gene",
"RightGene": "right_gene",
"LeftBreakpoint": "left_breakpoint",
"RightBreakpoint": "right_breakpoint",
"JunctionReadCount": "junction_read_count",
"SpanningFragCount": "spanning_frag_count",
}
return cls._process_fusion_caller_rows(path, column_rename, "\t")


class FusionCatcher(FusionCaller):
"""Define parameters for FusionCatcher model"""
Expand Down Expand Up @@ -197,25 +124,6 @@ class FusionCatcher(FusionCaller):
..., description="The inferred sequence around the fusion junction"
)

@classmethod
def load_records(cls, path: Path) -> list["FusionCatcher"]:
"""Load fusions from FusionCatcher txt file
:param path: The path to the file of FusionCatcher fusions
:return A list of FusionCatcher objects, or None if the specified file does not exist
"""
column_rename = {
"Gene_1_symbol(5end_fusion_partner)": "five_prime_partner",
"Gene_2_symbol(3end_fusion_partner)": "three_prime_partner",
"Fusion_point_for_gene_1(5end_fusion_partner)": "five_prime_fusion_point",
"Fusion_point_for_gene_2(3end_fusion_partner)": "three_prime_fusion_point",
"Predicted_effect": "predicted_effect",
"Spanning_unique_reads": "spanning_unique_reads",
"Spanning_pairs": "spanning_reads",
"Fusion_sequence": "fusion_sequence",
}
return cls._process_fusion_caller_rows(path, column_rename, "\t")


class Arriba(FusionCaller):
"""Define parameters for Arriba model"""
Expand Down Expand Up @@ -266,22 +174,6 @@ class Arriba(FusionCaller):
)
fusion_transcript: str = Field(..., description="The assembled fusion transcript")

@classmethod
def load_records(cls, path: Path) -> list["Arriba"]:
"""Load fusions from Arriba tsv file
:param path: The path to the file of Arriba fusions
:return A list of Arriba objects, or None if the specified file does not exist
"""
column_rename = {
"#gene1": "gene1",
"strand1(gene/fusion)": "strand1",
"strand2(gene/fusion)": "strand2",
"type": "event_type",
"reading_frame": "rf",
}
return cls._process_fusion_caller_rows(path, column_rename, "\t")


class Cicero(FusionCaller):
"""Define parameters for CICERO model"""
Expand Down Expand Up @@ -321,28 +213,6 @@ class Cicero(FusionCaller):
)
contig: str = Field(..., description="The assembled contig sequence for the fusion")

@classmethod
def load_records(cls, path: Path) -> list["Cicero"]:
"""Load fusions from Cicero txt file
:param path: The path to the file of Cicero fusions
:return A list of Cicero objects, or None if the specified file does not exist
"""
column_rename = {
"geneA": "gene_5prime",
"geneB": "gene_3prime",
"chrA": "chr_5prime",
"chrB": "chr_3prime",
"posA": "pos_5prime",
"posB": "pos_3prime",
"type": "event_type",
"readsA": "reads_5prime",
"readsB": "reads_3prime",
"coverageA": "coverage_5prime",
"coverageB": "coverage_3prime",
}
return cls._process_fusion_caller_rows(path, column_rename, "\t")


class EnFusion(FusionCaller):
"""Define parameters for EnFusion model"""
Expand All @@ -362,24 +232,6 @@ class EnFusion(FusionCaller):
None, description="The sequence near the fusion junction"
)

@classmethod
def load_records(cls, path: Path) -> list["EnFusion"]:
"""Load fusions from EnFusion tsv file
:param path: The path to the file of Enfusion fusions
:return A list of Enfusion objects, or None if the specified file does not exist
"""
column_rename = {
"Gene1": "gene_5prime",
"Gene2": "gene_3prime",
"Chr1": "chr_5prime",
"Chr2": "chr_3prime",
"Break1": "break_5prime",
"Break2": "break_3prime",
"FusionJunctionSequence": "fusion_junction_sequence",
}
return cls._process_fusion_caller_rows(path, column_rename, "\t")


class Genie(FusionCaller):
"""Define parameters for Genie model"""
Expand All @@ -395,22 +247,3 @@ class Genie(FusionCaller):
reading_frame: str = Field(
..., description="The reading frame status of the fusion"
)

@classmethod
def load_records(cls, path: Path) -> list["Genie"]:
"""Load fusions from Genie txt file
:param path: The path to the file of Genie structural variants
:return A list of Genie objects, or None if the specified file does not exist
"""
column_rename = {
"Site1_Hugo_Symbol": "site1_hugo",
"Site2_Hugo_Symbol": "site2_hugo",
"Site1_Chromosome": "site1_chrom",
"Site2_Chromosome": "site2_chrom",
"Site1_Position": "site1_pos",
"Site2_Position": "site2_pos",
"Site2_Effect_On_Frame": "reading_frame",
"Annotation": "annot",
}
return cls._process_fusion_caller_rows(path, column_rename, "\t")
166 changes: 166 additions & 0 deletions src/fusor/harvester.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""Harvester methods for output from different fusion callers"""

import csv
from abc import ABC
from pathlib import Path
from typing import ClassVar

from fusor.fusion_caller_models import (
JAFFA,
Arriba,
Cicero,
EnFusion,
FusionCaller,
FusionCatcher,
Genie,
STARFusion,
)


class FusionCallerHarvester(ABC):
"""ABC for fusion caller harvesters"""

fusion_caller: FusionCaller
column_rename: dict
delimeter: str

def load_records(
self,
fusion_path: Path,
) -> list[FusionCaller]:
"""Convert rows of fusion caller output to Pydantic classes
:param path: The path to the fusions file
:param column_rename: A dictionary of column mappings
:param delimeter: The delimeter for the fusions file
:raise ValueError: if the file does not exist at the specified path
:return: A list of fusions, represented as Pydantic objects
"""
if not fusion_path.exists():
statement = f"{fusion_path!s} does not exist"
raise ValueError(statement)
fusions_list = []
fields_to_keep = self.fusion_caller.__annotations__.keys()
with fusion_path.open() as csvfile:
reader = csv.DictReader(csvfile, delimiter=self.delimeter)
for row in reader:
row = {
self.column_rename.get(key, key): value
for key, value in row.items()
}
filered_row = {
key: value for key, value in row.items() if key in fields_to_keep
}
fusions_list.append(self.fusion_caller(**filered_row))
return fusions_list


class JAFFAHarvester(FusionCallerHarvester):
"""Class for harvesting JAFFA data"""

column_rename: ClassVar[dict] = {
"fusion genes": "fusion_genes",
"spanning reads": "spanning_reads",
"spanning pairs": "spanning_pairs",
}
delimeter = ","
fusion_caller = JAFFA


class StarFusionHarvester(FusionCallerHarvester):
"""Class for harvesting STAR-Fusion data"""

column_rename: ClassVar[dict] = {
"LeftGene": "left_gene",
"RightGene": "right_gene",
"LeftBreakpoint": "left_breakpoint",
"RightBreakpoint": "right_breakpoint",
"JunctionReadCount": "junction_read_count",
"SpanningFragCount": "spanning_frag_count",
}
delimeter = "\t"
fusion_caller = STARFusion


class FusionCatcherHarvester(FusionCallerHarvester):
"""Class for harvesting FusionCatcher data"""

column_rename: ClassVar[dict] = {
"Gene_1_symbol(5end_fusion_partner)": "five_prime_partner",
"Gene_2_symbol(3end_fusion_partner)": "three_prime_partner",
"Fusion_point_for_gene_1(5end_fusion_partner)": "five_prime_fusion_point",
"Fusion_point_for_gene_2(3end_fusion_partner)": "three_prime_fusion_point",
"Predicted_effect": "predicted_effect",
"Spanning_unique_reads": "spanning_unique_reads",
"Spanning_pairs": "spanning_reads",
"Fusion_sequence": "fusion_sequence",
}
delimeter = "\t"
fusion_caller = FusionCatcher


class ArribaHarvester(FusionCallerHarvester):
"""Class for harvesting Arriba data"""

column_rename: ClassVar[dict] = {
"#gene1": "gene1",
"strand1(gene/fusion)": "strand1",
"strand2(gene/fusion)": "strand2",
"type": "event_type",
"reading_frame": "rf",
}
delimeter = "\t"
fusion_caller = Arriba


class CiceroHarvester(FusionCallerHarvester):
"""Class for harvesting Cicero data"""

column_rename: ClassVar[dict] = {
"geneA": "gene_5prime",
"geneB": "gene_3prime",
"chrA": "chr_5prime",
"chrB": "chr_3prime",
"posA": "pos_5prime",
"posB": "pos_3prime",
"type": "event_type",
"readsA": "reads_5prime",
"readsB": "reads_3prime",
"coverageA": "coverage_5prime",
"coverageB": "coverage_3prime",
}
delimeter = "\t"
fusion_caller = Cicero


class EnFusionHarvester(FusionCallerHarvester):
"""Class for harvesting EnFusion data"""

column_rename: ClassVar[dict] = {
"Gene1": "gene_5prime",
"Gene2": "gene_3prime",
"Chr1": "chr_5prime",
"Chr2": "chr_3prime",
"Break1": "break_5prime",
"Break2": "break_3prime",
"FusionJunctionSequence": "fusion_junction_sequence",
}
delimeter = "\t"
fusion_caller = EnFusion


class GenieHarvester(FusionCallerHarvester):
"""Class for harvesting Genie data"""

column_rename: ClassVar[dict] = {
"Site1_Hugo_Symbol": "site1_hugo",
"Site2_Hugo_Symbol": "site2_hugo",
"Site1_Chromosome": "site1_chrom",
"Site2_Chromosome": "site2_chrom",
"Site1_Position": "site1_pos",
"Site2_Position": "site2_pos",
"Site2_Effect_On_Frame": "reading_frame",
"Annotation": "annot",
}
delimeter = "\t"
fusion_caller = Genie
Loading

0 comments on commit a722022

Please sign in to comment.