-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdante_gff_output_filtering.py
executable file
·364 lines (327 loc) · 14.7 KB
/
dante_gff_output_filtering.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
#!/usr/bin/env python3
import sys
import time
import configuration
import os
import textwrap
import subprocess
from tempfile import NamedTemporaryFile
from collections import defaultdict
class Range():
'''
This class is used to check float range in argparse
'''
def __init__(self, start, end):
self.start = start
self.end = end
def __eq__(self, other):
return self.start <= other <= self.end
def __str__(self):
return "float range {}..{}".format(self.start, self.end)
def __repr__(self):
return "float range {}..{}".format(self.start, self.end)
def check_file_start(gff_file):
count_comment = 0
with open(gff_file, "r") as gff_all:
line = gff_all.readline()
while line.startswith("#"):
line = gff_all.readline()
count_comment += 1
return count_comment
def write_info(filt_dom_tmp, FILT_DOM_GFF, orig_class_dict, filt_class_dict,
dom_dict, version_lines, TH_IDENTITY, TH_SIMILARITY,
TH_LENGTH, TH_INTERRUPT, TH_LEN_RATIO, SELECTED_DOM):
'''
Write domains statistics in beginning of filtered GFF
'''
with open(FILT_DOM_GFF, "w") as filt_gff:
for line in version_lines:
filt_gff.write(line)
filt_gff.write(("##Filtering thresholdss: min identity: {}, min similarity: {},"
" min relative alingment length: {}, max interuptions(stop or "
"frameshift): {}, max relative alignment length: {}, selected"
" domains: {} \n").format(TH_IDENTITY,
TH_SIMILARITY,
TH_LENGTH,
TH_INTERRUPT,
TH_LEN_RATIO,
SELECTED_DOM))
filt_gff.write("##CLASSIFICATION\tORIGINAL_COUNTS\tFILTERED_COUNTS\n")
if not orig_class_dict:
filt_gff.write("##NO DOMAINS CLASSIFICATIONS\n")
for classification in sorted(orig_class_dict.keys()):
if classification in filt_class_dict.keys():
filt_gff.write("##{}\t{}\t{}\n".format(
classification, orig_class_dict[
classification], filt_class_dict[classification]))
else:
filt_gff.write("##{}\t{}\t{}\n".format(
classification, orig_class_dict[classification], 0))
filt_gff.write("##-----------------------------------------------\n"
"##SEQ\tDOMAIN\tCOUNTS\n")
if not dom_dict:
filt_gff.write("##NO DOMAINS\n")
for seq in sorted(dom_dict.keys()):
for dom, count in sorted(dom_dict[seq].items()):
filt_gff.write("##{}\t{}\t{}\n".format(seq, dom, count))
filt_gff.write("##-----------------------------------------------\n")
with open(filt_dom_tmp.name, "r") as filt_tmp:
for line in filt_tmp:
filt_gff.write(line)
def get_file_start(gff_file):
count_comment = 0
lines = []
with open(gff_file, "r") as gff_all:
line = gff_all.readline()
while line.startswith("#"):
lines.append(line)
line = gff_all.readline()
count_comment += 1
return count_comment, lines
def parse_gff_line(line):
'''Return dictionary with gff fields and atributers
Note - type of fields is strings
'''
# order of first 9 column is fixed
gff_line = dict(
zip(
['seqid', 'source', 'type', 'start', 'end',
'score', 'strand', 'phase', 'attributes'],
line.split("\t")
)
)
# split attributes and replace:
gff_line['attributes'] = dict([i.split("=") for i in gff_line['attributes'].split(";")])
return gff_line
def filter_qual_dom(DOM_GFF, FILT_DOM_GFF, TH_IDENTITY, TH_SIMILARITY,
TH_LENGTH, TH_INTERRUPT, TH_LEN_RATIO, SELECTED_DOM,
ELEMENT):
''' Filter gff output based on domain and quality of alignment '''
[count_comment, version_lines] = get_file_start(DOM_GFF)
filt_dom_tmp = NamedTemporaryFile(delete=False)
with open(DOM_GFF, "r") as gff_all, open(filt_dom_tmp.name,
"w") as gff_filtered:
for _ in range(count_comment):
next(gff_all)
dom_dict = defaultdict(lambda: defaultdict(int))
orig_class_dict = defaultdict(int)
filt_class_dict = defaultdict(int)
seq_ids_all = []
xminimals = []
xmaximals = []
domains = []
xminimals_all = []
xmaximals_all = []
domains_all = []
start = True
for line in gff_all:
# skip comments
if line.startswith("#"):
continue
gff_line = parse_gff_line(line)
classification = gff_line['attributes']['Final_Classification']
orig_class_dict[classification] += 1
## ambiguous domains filtered out automatically
if classification != configuration.AMBIGUOUS_TAG:
al_identity = float(gff_line['attributes']['Identity'])
al_similarity = float(gff_line['attributes']['Similarity'])
al_length = float(gff_line['attributes']['Relat_Length'])
relat_interrupt = float(gff_line['attributes']['Relat_Interruptions'])
db_len_proportion = float(gff_line['attributes']['Hit_to_DB_Length'])
dom_type = gff_line['attributes']['Name']
seq_id = gff_line['seqid']
xminimal = int(gff_line['start'])
xmaximal = int(gff_line['end'])
c1 = al_identity >= TH_IDENTITY
c2 = al_similarity >= TH_SIMILARITY
if (c1 and c2 and al_length >= TH_LENGTH and relat_interrupt <= TH_INTERRUPT and
db_len_proportion <= TH_LEN_RATIO and
(dom_type == SELECTED_DOM or SELECTED_DOM == "All") and
(ELEMENT in classification)):
gff_filtered.writelines(line)
filt_class_dict[classification] += 1
dom_dict[seq_id][dom_type] += 1
if start:
seq_ids_all.append(line.split("\t")[0])
start = False
if seq_id != seq_ids_all[-1]:
seq_ids_all.append(seq_id)
xminimals_all.append(xminimals)
xmaximals_all.append(xmaximals)
domains_all.append(domains)
xminimals = []
xmaximals = []
domains = []
xminimals.append(xminimal)
xmaximals.append(xmaximal)
domains.append(dom_type)
path = os.path.dirname(os.path.realpath(__file__))
write_info(filt_dom_tmp, FILT_DOM_GFF, orig_class_dict, filt_class_dict,
dom_dict, version_lines, TH_IDENTITY, TH_SIMILARITY,
TH_LENGTH, TH_INTERRUPT, TH_LEN_RATIO, SELECTED_DOM)
os.unlink(filt_dom_tmp.name)
xminimals_all.append(xminimals)
xmaximals_all.append(xmaximals)
domains_all.append(domains)
return xminimals_all, xmaximals_all, domains_all, seq_ids_all
def get_domains_protseq(FILT_DOM_GFF, DOMAIN_PROT_SEQ):
''' Get the translated protein sequence of original DNA seq for all the filtered domains regions
The translated sequences are taken from alignment reported by LASTAL (Query_Seq attribute in GFF)
'''
count_comment = check_file_start(FILT_DOM_GFF)
with open(FILT_DOM_GFF, "r") as filt_gff:
for comment_idx in range(count_comment):
next(filt_gff)
with open(DOMAIN_PROT_SEQ, "w") as dom_prot_file:
for line in filt_gff:
attributes = line.rstrip().split("\t")[8]
positions = attributes.split(";")[3].split("=")[1].split(":")[
-1].split("[")[0]
dom = attributes.split(";")[0].split("=")[1]
dom_class = attributes.split(";")[1].split("=")[1]
seq_id = line.rstrip().split("\t")[0]
prot_seq_align = line.rstrip().split("\t")[8].split(";")[
6].split("=")[1]
prot_seq = prot_seq_align.translate({ord(i): None
for i in '/\\-'})
header_prot_seq = ">{}:{} {} {}".format(seq_id, positions, dom,
dom_class)
dom_prot_file.write("{}\n{}\n".format(
header_prot_seq, textwrap.fill(prot_seq,
configuration.FASTA_LINE)))
def main(args):
t = time.time()
DOM_GFF = args.dom_gff
DOMAIN_PROT_SEQ = args.domains_prot_seq
TH_IDENTITY = args.th_identity
TH_LENGTH = args.th_length
TH_INTERRUPT = args.interruptions
TH_SIMILARITY = args.th_similarity
TH_LEN_RATIO = args.max_len_proportion
FILT_DOM_GFF = args.domains_filtered
SELECTED_DOM = args.selected_dom
OUTPUT_DIR = args.output_dir
# DELETE : ELEMENT = args.element_type.replace("_pipe_", "|")
ELEMENT = args.element_type
if DOMAIN_PROT_SEQ is None:
DOMAIN_PROT_SEQ = configuration.DOM_PROT_SEQ
if FILT_DOM_GFF is None:
FILT_DOM_GFF = configuration.FILT_DOM_GFF
if OUTPUT_DIR and not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
if not os.path.isabs(FILT_DOM_GFF):
if OUTPUT_DIR is None:
OUTPUT_DIR = os.path.dirname(os.path.abspath(DOM_GFF))
FILT_DOM_GFF = os.path.join(OUTPUT_DIR, os.path.basename(FILT_DOM_GFF))
DOMAIN_PROT_SEQ = os.path.join(OUTPUT_DIR,
os.path.basename(DOMAIN_PROT_SEQ))
[xminimals_all, xmaximals_all, domains_all, seq_ids_all] = filter_qual_dom(
DOM_GFF, FILT_DOM_GFF, TH_IDENTITY, TH_SIMILARITY, TH_LENGTH,
TH_INTERRUPT, TH_LEN_RATIO, SELECTED_DOM, ELEMENT)
get_domains_protseq(FILT_DOM_GFF, DOMAIN_PROT_SEQ)
print("ELAPSED_TIME_DOMAINS = {} s".format(time.time() - t))
if __name__ == "__main__":
import argparse
from argparse import RawDescriptionHelpFormatter
class CustomFormatter(argparse.ArgumentDefaultsHelpFormatter,
argparse.RawDescriptionHelpFormatter):
pass
parser = argparse.ArgumentParser(
description=
'''The script performs DANTE's output filtering for quality and/or extracting specific type of protein domain or mobile elements of origin. For the filtered domains it reports their translated protein sequence of original DNA.
WHEN NO PARAMETERS GIVEN, IT PERFORMS QUALITY FILTERING USING THE DEFAULT PARAMETRES (optimized for Viridiplantae species)
INPUTS:
- GFF3 file produced by protein_domains.py OR already filtered GFF3
FILTERING OPTIONS:
> QUALITY: - Min relative length of alignemnt to the protein domain from DB (without gaps)
- Identity
- Similarity (scoring matrix: BLOSUM82)
- Interruption in the reading frame (frameshifts + stop codons) per every starting 100 AA
- Max alignment proportion to the original length of database domain sequence
> DOMAIN TYPE: choose from choices ('Name' attribute in GFF)
Records for ambiguous domain type (e.g. INT/RH) are filtered out automatically
> MOBILE ELEMENT TYPE:
arbitrary substring of the element classification ('Final_Classification' attribute in GFF)
OUTPUTS:
- filtered GFF3 file
- fasta file of translated protein sequences (from original DNA) for the aligned domains that match the filtering criteria
DEPENDENCIES:
- python 3.4 or higher
> ProfRep modules:
- configuration.py
EXAMPLE OF USAGE:
Getting quality filtered integrase(INT) domains of all gypsy transposable elements:
./domains_filtering.py -dom_gff PATH_TO_INPUT_GFF -pdb PATH_TO_PROTEIN_DB -cs PATH_TO_CLASSIFICATION_FILE --selected_dom INT --element_type Ty3/gypsy
''',
epilog="""""",
formatter_class=CustomFormatter)
requiredNamed = parser.add_argument_group('required named arguments')
requiredNamed.add_argument("-dg",
"--dom_gff",
type=str,
required=True,
help="basic unfiltered gff file of all domains")
parser.add_argument("-ouf",
"--domains_filtered",
type=str,
help="output filtered domains gff file")
parser.add_argument("-dps",
"--domains_prot_seq",
type=str,
help="output file containg domains protein sequences")
parser.add_argument("-thl",
"--th_length",
type=float,
choices=[Range(0.0, 1.0)],
default=0.8,
help="proportion of alignment length threshold")
parser.add_argument("-thi",
"--th_identity",
type=float,
choices=[Range(0.0, 1.0)],
default=0.35,
help="proportion of alignment identity threshold")
parser.add_argument("-ths",
"--th_similarity",
type=float,
choices=[Range(0.0, 1.0)],
default=0.45,
help="threshold for alignment proportional similarity")
parser.add_argument(
"-ir",
"--interruptions",
type=int,
default=3,
help=
"interruptions (frameshifts + stop codons) tolerance threshold per 100 AA")
parser.add_argument(
"-mlen",
"--max_len_proportion",
type=float,
default=1.2,
help=
"maximal proportion of alignment length to the original length of protein domain from database")
parser.add_argument(
"-sd",
"--selected_dom",
type=str,
default="All",
choices=[
"All", "GAG", "INT", "PROT", "RH", "RT", "aRH", "CHDCR", "CHDII",
"TPase", "YR", "HEL1", "HEL2", "ENDO"
],
help="filter output domains based on the domain type")
parser.add_argument(
"-el",
"--element_type",
type=str,
default="",
help="filter output domains by typing substring from classification")
parser.add_argument(
"-dir",
"--output_dir",
type=str,
default=None,
help="specify if you want to change the output directory")
args = parser.parse_args()
main(args)