Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3753: load ENA project to EVAPRO #241

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
191 changes: 191 additions & 0 deletions eva_submission/evapro/find_from_ena.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import re
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think there's anything in any of this code that we (or ENA) might not want to make public? Just mentioning as the previous script was in a private repo...

from functools import cached_property
import xml.etree.ElementTree as ET

import oracledb
from ebi_eva_common_pyutils.config import cfg

class EnaProjectFinder():

@cached_property
def era_connection(self):
era_cred = cfg.query('ena', 'ERA')
return oracledb.connect(user=era_cred.get('username'),
password=era_cred.get('password'),
dsn=f"{era_cred.get('host')}:{era_cred.get('port')}/{era_cred.get('database')}")

def era_cursor(self):
return self.era_connection.cursor()

@staticmethod
def _parse_study_description_from_xml(study_xml):
root = ET.fromstring(study_xml)

# Extract Study Description
doc = root.find(".//STUDY_DESCRIPTION")
if doc:
return doc.text
else:
return ''

@staticmethod
def _parse_actions_and_alias_from_submission_xml(submission_xml):
root = ET.fromstring(submission_xml)
submission = root.find(".//SUBMISSION")
submission_alias = submission.attrib.get("alias") if submission is not None else None

actions = []
hold_date = None
for action in root.findall(".//ACTION"):
for child in action:
action_type = child.tag
if action_type == "HOLD":
hold_date = child.attrib.get("HoldUntilDate")
continue
schema = child.attrib.get("schema")
source = child.attrib.get("source")
actions.append({"type": action_type, "schema": schema, "source": source})
# Prioritise actions -- Select project if present then analysis then something else
project_action = [action for action in actions if action['schema'] == 'project']
analysis_action = [action for action in actions if action['schema'] == 'analysis']
other_actions = [action for action in actions if action['schema'] not in ['project', 'analysis']]
if project_action:
action = project_action[0]
elif analysis_action:
action = analysis_action[0]
elif other_actions:
action = other_actions[0]
else:
action = None
return submission_alias, hold_date, action

@staticmethod
def _parse_submission_from_xml(submission_xml):
root = ET.fromstring(submission_xml)

# Extract Study Description
doc = root.find(".//STUDY_DESCRIPTION")
if doc:
return doc.text
else:
return ''

def _parse_analysis_description_and_type_from_xml(self, analysis_xml):
root = ET.fromstring(analysis_xml)
description = root.find(".//DESCRIPTION").text if root.find(".//DESCRIPTION") is not None else None

# Extract Analysis Type and associated elements
analysis_type_element = root.find(".//ANALYSIS_TYPE")
platforms = set()
experiment_types = set()

if analysis_type_element is not None:
# Get the first child of ANALYSIS_TYPE (e.g., SEQUENCE_VARIATION)
for child in analysis_type_element:
# Extract associated elements
for sub_element in child:
if sub_element.tag == 'EXPERIMENT_TYPE':
experiment_types.add(sub_element.text)
if sub_element.tag == 'PLATFORM':
platforms.add(sub_element.text)
return description, experiment_types, platforms

def find_project_from_ena_database(self, project_accession):
era_project_query = (
'select s.study_id, project_id, s.submission_id, p.center_name, p.project_alias, s.study_type, '
'p.first_created, p.project_title, p.tax_id, p.scientific_name, p.common_name, '
'xmltype.getclobval(STUDY_XML) study_xml, xmltype.getclobval(PROJECT_XML) project_xml '
'from era.PROJECT p '
'left outer join era.STUDY s using(project_id) '
f"where project_id='{project_accession}'"
)
with self.era_cursor() as cursor:
results = list(cursor.execute(era_project_query))
assert len(results) == 1, f'{len(results)} project accession found in ERA for {project_accession}'
(
study_id, project_accession, submission_id, center_name, project_alias, study_type,
first_created, project_title, taxonomy_id, scientific_name, common_name, study_xml, project_xml
) = results[0]
study_description = self._parse_study_description_from_xml(str(study_xml))
# Project publication used to be parsed from the project XML. but there were none in the EVA Projects
# project_publications ['PROJECT_XML/PUBLICATIONS/PUBLICATION/PUBLICATION_LINKS/PUBLICATION_LINK/[DB:ID]']
return (
study_id, project_accession, submission_id, center_name, project_alias, study_type, first_created,
str(project_title), taxonomy_id, scientific_name, common_name, study_description
)

def find_parent_project(self, project_accession):
# link_type=2 == project
# link_role=1 == hierarchical
era_linked_project_query = (f"select to_id from era.ena_link "
f"where TO_LINK_TYPE_ID=2 AND LINK_ROLE_ID=1 AND from_id='{project_accession}'")
with self.era_cursor() as cursor:
parent_project = [to_id for to_id, in cursor.execute(era_linked_project_query)]
if not parent_project:
return None
elif len(parent_project) > 1:
raise RuntimeError(f"Multiple parent projects found for project accession {project_accession}")
else:
return parent_project[0]

def find_ena_submission(self, project_accession):
era_submission_query = (
"select submission.submission_id, xmltype.getclobval(SUBMISSION_XML) submission_xml, "
"submission.last_updated UPDATED "
"from era.submission "
"left outer join era.study on submission.submission_id=study.submission_id "
f"where study.project_id='{project_accession}' and study.submission_id like 'ERA%'"
)
with self.era_cursor() as cursor:
for results in cursor.execute(era_submission_query):
(submission_id, submission_xml, last_updated) = results
alias, hold_date, action = self._parse_actions_and_alias_from_submission_xml(str(submission_xml))
yield submission_id, alias, last_updated, hold_date, action

def find_analysis_in_ena(self, project_accession):
era_analysis_query = (
'select t.analysis_id, t.analysis_title, t.analysis_alias, t.analysis_type, t.center_name, t.first_created, '
' xmltype.getclobval(t.ANALYSIS_XML) analysis_xml, x.assembly assembly, x.refname refname, y.custom '
'from era.analysis t '
"left outer join XMLTABLE('/ANALYSIS_SET//ANALYSIS_TYPE//SEQUENCE_VARIATION//ASSEMBLY//STANDARD' passing t.analysis_xml columns assembly varchar2(2000) path \'@accession\', refname varchar2(2000) path \'@refname\') x on (1=1) "
"left outer join XMLTABLE('/ANALYSIS_SET//ANALYSIS_TYPE//SEQUENCE_VARIATION//ASSEMBLY//CUSTOM//URL_LINK' passing t.analysis_xml columns custom varchar2(2000) path \'URL\') y on (1=1) "
"where t.status_id <> 5 and lower(SUBMISSION_ACCOUNT_ID) in ('webin-1008') "
f"and (study_id in (select study_id from era.study where project_id='{project_accession}') "
f"or study_id='{project_accession}' or bioproject_id='{project_accession}')"
)
with self.era_cursor() as cursor:
for results in cursor.execute(era_analysis_query):
(
analysis_id, analysis_title, analysis_alias, analysis_type, center_name, first_created,
analysis_xml, assembly, refname, custom
) = results
analysis_description, experiment_types, platforms = self._parse_analysis_description_and_type_from_xml(str(analysis_xml))
if analysis_type != 'SEQUENCE_VARIATION':
continue
yield (
analysis_id, analysis_title, analysis_alias, analysis_description, analysis_type, center_name,
first_created, assembly, refname, custom, experiment_types, platforms
)


def find_samples_in_ena(self, analysis_accession):
query = ("select s.sample_id, s.biosample_id from era.analysis_sample asa "
f"join era.sample s on asa.sample_id = s.sample_id where analysis_id='{analysis_accession}'")
with self.era_cursor() as cursor:
for sample_id, sample_accession in cursor.execute(query):
yield sample_id, sample_accession

def find_files_in_ena(self, analysis_accession):
query = (
"select distinct asf.analysis_id as analysis_accession, wf.submission_file_id as submission_file_id, "
"regexp_substr(wf.data_file_path, \'[^/]*$\') as filename,"
"wf.checksum as file_md5, wf.data_file_format as file_type, ana.status_id "
"from era.analysis_submission_file asf "
"join era.webin_file wf on asf.analysis_id=wf.data_file_owner_id "
f"join era.analysis ana on asf.analysis_id=ana.analysis_id where asf.analysis_id='{analysis_accession}'"
)

with self.era_cursor() as cursor:
for analysis_accession, submission_file_id, filename, file_md5, file_type, status_id in cursor.execute(query):
yield analysis_accession, submission_file_id, filename, file_md5, file_type, status_id

Loading
Loading