Skip to content

Commit

Permalink
added type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex-Karmazin committed Jan 15, 2023
1 parent 3704dde commit 1d09603
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 34 deletions.
25 changes: 13 additions & 12 deletions coronary_ref_homo.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,50 @@
import sqlite3
from sqlite3 import Error
from pathlib import Path


class CoronaryRefHomo:

rsid_map = {}
rsid_map:dict[str, dict] = {}

def init(self, reporter, sql_insert):
def init(self, reporter, sql_insert:str) -> None:
self.parent = reporter
self.sql_insert = sql_insert


def setup(self):
sql = "SELECT rsID, Risk_allele FROM coronary_disease WHERE Ref_allele = Risk_allele"
def setup(self) -> None:
sql:str = "SELECT rsID, Risk_allele FROM coronary_disease WHERE Ref_allele = Risk_allele"
self.parent.coronary_cursor.execute(sql)
rows = self.parent.coronary_cursor.fetchall()
rows:list = self.parent.coronary_cursor.fetchall()
for rsid, risk_allele in rows:
self.rsid_map[rsid] = {'exist':True, 'risk':risk_allele}


def process_row(self, row):
rsid = str(row['dbsnp__rsid'])
rsid:str = str(row['dbsnp__rsid'])
if rsid == '':
return

if not rsid.startswith('rs'):
rsid = "rs"+rsid

item = self.rsid_map.get(rsid)
item:dict = self.rsid_map.get(rsid)
if item:
self.rsid_map[rsid]['exist'] = False


def end(self):
def end(self) -> None:
for rsid in self.rsid_map:
if self.rsid_map[rsid]['exist']:
risk = self.rsid_map[rsid]['risk']
risk:str = self.rsid_map[rsid]['risk']
risk = risk+risk

query = "SELECT Risk_allele, Gene, Genotype, Conclusion, Weight, PMID, Population, GWAS_study_design, P_value " \
query:str = "SELECT Risk_allele, Gene, Genotype, Conclusion, Weight, PMID, Population, GWAS_study_design, P_value " \
f"FROM coronary_disease WHERE rsID = '{rsid}' AND Genotype = '{risk}';"

self.parent.coronary_cursor.execute(query)
row = self.parent.coronary_cursor.fetchone()
row:list = self.parent.coronary_cursor.fetchone()
if row:
task = (rsid, row[1], row[0], row[0]+"/"+row[0], row[3], row[4], row[5], row[6], row[7], row[8],
task:tuple = (rsid, row[1], row[0], row[0]+"/"+row[0], row[3], row[4], row[5], row[6], row[7], row[8],
self.parent.get_color(row[4], 0.6))
self.parent.longevity_cursor.execute(self.sql_insert, task)
44 changes: 22 additions & 22 deletions just_coronary.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


class CravatPostAggregator (BasePostAggregator):
sql_insert = """ INSERT INTO coronary (
sql_insert:str = """ INSERT INTO coronary (
rsid,
gene,
risk,
Expand All @@ -21,23 +21,23 @@ class CravatPostAggregator (BasePostAggregator):
pvalue,
weightcolor
) VALUES (?,?,?,?,?,?,?,?,?,?,?) """
ref_homo = coronary_ref_homo.CoronaryRefHomo()
ref_homo:coronary_ref_homo.CoronaryRefHomo = coronary_ref_homo.CoronaryRefHomo()

def check(self):
return True

def setup (self):
self.ref_homo.init(self, self.sql_insert)
modules_path = str(Path(__file__).parent)
sql_file = modules_path + "/data/coronary.sqlite"
modules_path:str = str(Path(__file__).parent)
sql_file:str = modules_path + "/data/coronary.sqlite"
if Path(sql_file).exists():
self.coronary_conn = sqlite3.connect(sql_file)
self.coronary_cursor = self.coronary_conn.cursor()
self.coronary_conn:sqlite3.Connection = sqlite3.connect(sql_file)
self.coronary_cursor:sqlite3.Cursor = self.coronary_conn.cursor()

self.result_path = Path(self.output_dir, self.run_name + "_longevity.sqlite")
self.longevity_conn = sqlite3.connect(self.result_path)
self.longevity_cursor = self.longevity_conn.cursor()
sql_create = """ CREATE TABLE IF NOT EXISTS coronary (
self.result_path:Path = Path(self.output_dir, self.run_name + "_longevity.sqlite")
self.longevity_conn:sqlite3.Connection = sqlite3.connect(self.result_path)
self.longevity_cursor:sqlite3.Cursor = self.longevity_conn.cursor()
sql_create:str = """ CREATE TABLE IF NOT EXISTS coronary (
id integer NOT NULL PRIMARY KEY,
rsid text,
gene text,
Expand Down Expand Up @@ -70,14 +70,14 @@ def cleanup (self):
return


def get_color(self, w, scale = 1.5):
def get_color(self, w:float, scale:float = 1.5) -> str:
w = float(w)
if w < 0:
w = w * -1
w = 1 - w * scale
if w < 0:
w = 0
color = format(int(w * 255), 'x')
color:str = format(int(w * 255), 'x')
if len(color) == 1:
color = "0" + color
color = "ff" + color + color
Expand All @@ -94,7 +94,7 @@ def get_color(self, w, scale = 1.5):


def annotate (self, input_data):
rsid = str(input_data['dbsnp__rsid'])
rsid:str = str(input_data['dbsnp__rsid'])
if rsid == '':
return

Expand All @@ -103,30 +103,30 @@ def annotate (self, input_data):
if not rsid.startswith('rs'):
rsid = "rs" + rsid

alt = input_data['base__alt_base']
ref = input_data['base__ref_base']
alt:str = input_data['base__alt_base']
ref:str = input_data['base__ref_base']

query = "SELECT Risk_allele, Gene, Genotype, Conclusion, Weight, PMID, Population, GWAS_study_design, P_value " \
query:str = "SELECT Risk_allele, Gene, Genotype, Conclusion, Weight, PMID, Population, GWAS_study_design, P_value " \
f"FROM coronary_disease WHERE rsID = '{rsid}';"

self.coronary_cursor.execute(query)
rows = self.coronary_cursor.fetchall()
rows:list[Any] = self.coronary_cursor.fetchall()

if len(rows) == 0:
return

zygot = input_data['vcfinfo__zygosity']
genome = alt + "/" + ref
gen_set = {alt, ref}
zygot:str = input_data['vcfinfo__zygosity']
genome:str = alt + "/" + ref
gen_set:set[str] = {alt, ref}
if zygot == 'hom':
genome = alt + "/" + alt
gen_set = {alt, alt}
for row in rows:
allele = row[0]
row_gen = {row[2][0], row[2][1]}
row_gen:set[str] = {row[2][0], row[2][1]}

if gen_set == row_gen:
task = (rsid, row[1], allele, genome, row[3], float(row[4]), row[5], row[6], row[7],
task:tuple = (rsid, row[1], allele, genome, row[3], float(row[4]), row[5], row[6], row[7],
row[8], self.get_color(row[4], 0.6))
self.longevity_cursor.execute(self.sql_insert, task)

Expand Down

0 comments on commit 1d09603

Please sign in to comment.