-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgbk2fasta.py
163 lines (139 loc) · 7.26 KB
/
gbk2fasta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import os
from pathlib import Path
from pathlib import PurePath
from Bio import SeqIO
from BioSQL import BioSeqDatabase
from dir_mana import dir_mana
from lister import Lister
home = os.getcwd()
project = "GPCR-Orthologs-Project"
user = "rgilmore"
where = dir_mana(home, project)
# Use lister() class here so that we can easily access our Master RNA Accession File
what = Lister('MAFV3.1.csv') # Always make sure this file name is correct
class Gbk2Fasta(object):
"""
This class accepts .gbk files or .db files (loaded with target .gbk files
with BioPython schema). The output is a group of FASTA files. The
different types of regions that are specified by the GenBank files get
their own directories.
"""
def __init__(self, path=Path(os.getcwd()), db_flag=False, gbk_flag=False):
# Where are the files?
path = Path('C:\\Users\\rgilmore\\PycharmProjects\\GPCR-Orthologs-Project\\CODE\\1_Databases\\Vallender_Data')
self.path = path
# Parse .gbk files, .db files, or both
os.chdir(self.path)
for f_db in os.listdir():
if '.db' in PurePath(f_db).suffix:
self.db(f_db)
elif '.gbk' in PurePath(f_db).suffix:
self.gbk(f_db)
def db(self, database):
"""
Create FASTA files for every GenBank record in the database.
"""
server = BioSeqDatabase.open_database(driver="sqlite3", db=database)
try:
for db_name in server.keys():
db = server[db_name]
for item in db.keys():
record = db.lookup(item)
self.write_fasta_file(record)
except:
raise()
def gbk(self, file):
"""
Write a group of fasta files from a group of .gbk files in the directory.
The specified self.path must be an empty directory with .gbk files
"""
record = SeqIO.read(file, 'genbank')
self.write_fasta_file(record)
def write_fasta_file(self, record):
feature_list = []
for feature in record.features:
m = 'MASTER'
accession = record.id
try:
organism = what.acc_dict[accession][1]
except KeyError:
accession = str(record.id).lower()
organism = what.acc_dict[accession][1]
gi = record.annotations['gi']
feat = str(feature.type)
feature_list.append(feature.type)
# Add this in the name_fasta_file definition
# If there are duplicates of feature.type
# then add a number to the end of the name
if feature.type in feature_list:
x = feature_list.count(feature.type) # Number of duplicates
feat += str(x) # Adding the number to the name of the feature.type
if feature.type == "CDS":
# Create a .ffn file (FASTA for Coding Nucleic Acids)
file = self.name_fasta_file(accession, feature.type, feat, extension='.ffn')
self.file.write(">gi|" + gi + "|ref|" + accession + '| ' + record.description + "\n"
+ str(feature.extract(record.seq)))
self.file.close()
# Create a .faa file (FASTA for Amino Acids)
self.file = self.name_fasta_file(accession, 'Protein', feat, extension='.faa')
gi_p = self.protein_gi_fetch(feature)
self.file.write(">gi|" + str(gi_p) + "|ref|"
+ str(feature.qualifiers['protein_id'][0]) + '| '
+ str(feature.qualifiers['product'][0]) + ' ' + str(organism) + '\n'
+ str(feature.qualifiers['translation'][0]))
self.file.close()
# Create a MASTER .ffn file (multi-FASTA file for Coding Nucleic Acids)
self.file = self.name_fasta_file(accession, m, feat, extension='.ffn')
self.file.write(">gi|" + gi + "|ref|" + accession + '| ' + record.description + "\n"
+ str(feature.extract(record.seq)) + "\n")
self.file.close()
# Create a MASTER .faa file (multi-FASTA file for Amino Acids)
self.file = self.name_fasta_file(accession, m, feat, extension='.faa')
gi_p = self.protein_gi_fetch(feature)
self.file.write(">gi|" + str(gi_p) + "|ref|"
+ str(feature.qualifiers['protein_id'][0]) + '| '
+ str(feature.qualifiers['product'][0]) + ' ' + organism + '\n'
+ str(feature.qualifiers['translation'][0]) + '\n')
self.file.close()
elif feature.type == "misc_feature":
# Creates .fna files (generic FASTA file for Nucleic Acids)
self.file = self.name_fasta_file(accession, feature.type, feat, extension='.fna')
self.file.write(">gi|" + gi + "|ref|" + accession + '| ' + record.description
+ ' Feature: ' + str(feature.qualifiers['note'][0]) + '\n'
+ str(feature.extract(record.seq)))
self.file.close()
self.file = self.name_fasta_file(accession, m, feat, extension='.fna')
self.file.write(">gi|" + gi + "|ref|" + accession + '| ' + record.description
+ ' Feature: ' + str(feature.qualifiers['note'][0]) + '\n'
+ str(feature.extract(record.seq)) + "\n")
self.file.close()
elif feature.type != "variation":
print(feature.type)
# Creates .fasta files (generic FASTA file)
self.file = self.name_fasta_file(accession, 'Other', feat, extension='.fasta')
self.file.write(">gi|" + gi + "|ref|" + accession + '| ' + record.description + "\n"
+ str(feature.extract(record.seq)) + "\n")
self.file.close()
self.file = self.name_fasta_file(accession, 'Other', feat, extension='.fasta')
self.file.write(">gi|" + gi + "|ref|" + accession + '| ' + record.description + "\n"
+ str(feature.extract(record.seq)) + "\n")
self.file.close()
def name_fasta_file(self, accession, f_type, feature_no, extension):
gene = what.acc_dict[accession][0]
organism = what.acc_dict[accession][1]
if os.path.isdir(self.path / Path(f_type)) is False:
os.mkdir(self.path / Path(f_type))
if os.path.isdir(self.path / Path(f_type) / Path(gene)) is False:
os.mkdir(self.path / Path(f_type) / Path(gene))
if f_type == 'MASTER':
mode = 'a'
self.file = open(self.path / Path(f_type) / Path(gene) / Path('%s_%s%s' % (gene, feature_no, extension)), mode)
else:
mode = 'w'
self.file = open(self.path / Path(f_type) / Path(gene) / Path('%s_%s_%s%s' % (gene, organism, feature_no, extension)), mode)
return self.file
def protein_gi_fetch(self, feature):
for x in feature.qualifiers:
if 'GI' in x:
head, sup, gi_p = x.partition(':')
return gi_p