forked from hakyimlab/QTL_to_PredictDB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DataFrameStreamer.py
332 lines (265 loc) · 12.8 KB
/
DataFrameStreamer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
import gzip
import pandas
import logging
import Utilities
import Constants
import os
import re
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s: %(message)s")
def to_rsid(snp_dict, chr, pos):
# logging.info("Loading SNPs definition.")
if isinstance(rsid_dict, str):
rsid_dict = load_snp_dictionary(rsid_dict) # If rsid_dict is a string, it's assumed that it's the path to a file containing two columns (chr:pos and rsid).
elif not isinstance(rsid_dict, dict):
exit("Argument rsid_dict must be a path to a dictionary file or a python dictionary.")
#logging.info("Dictionary loaded.")
key = (chr, pos)
try:
rsid = rsid_dict[key]
return rsid
except KeyError:
return None
def get_col_indexes(header_comps, params):
# TODO: Assuming that "params" is correct, i.e. parameters passed as arguments do appear in the file header.
# TODO: consider the case where the header is not the first line in the file, adding a skip_until_header argument)
# header_comps = header.strip().split() # column names
col_indexes = {
Constants.SNP: header_comps.index(params.snp_column),
Constants.GENENAME: header_comps.index(params.gene_column),
Constants.PVALUE: header_comps.index(params.pvalue_column),
Constants.BETA: header_comps.index(params.beta_column),
}
if params.se_column is not None:
col_indexes[Constants.SE] = header_comps.index(params.se_column)
if params.ref_allele_column is not None:
col_indexes[Constants.REF_ALLELE] = header_comps.index(params.ref_allele_column)
if params.alt_allele_column is not None:
col_indexes[Constants.ALT_ALLELE] = header_comps.index(params.alt_allele_column)
if params.chromosome_column is not None:
col_indexes[Constants.CHROMOSOME] = header_comps.index(params.chromosome_column)
if params.position_column is not None:
col_indexes[Constants.POSITION] = header_comps.index(params.position_column)
return col_indexes
def to_dataframe(data, columns, to_numeric=None, fill_na=None):
data = list(zip(*data))
if to_numeric:
data = [pandas.to_numeric(x, errors=to_numeric) for x in data]
if len(data) == 0:
data = [[] for i in range(0, len(columns))]
data = { columns[i]: data[i] for i in range(0, len(columns)) }
data = pandas.DataFrame(data)
data = data[columns]
if fill_na:
data = data.fillna(fill_na)
return data
def extract_chr_pos(snpid, regex):
chromosome, position = snpid.group("chromosome", "position")
return {Constants.CHROMOSOME: chromosome, Constants.POSITION: position}
def extract_chr_pos_ea_nea(snpid, regex):
match = regex.match(snpid)
chromosome, position, effect_allele, non_effect_allele = \
match.group("chromosome", "position", "effect_allele", "non_effect_allele")
return {Constants.CHROMOSOME: chromosome, Constants.POSITION: position, \
Constants.REF_ALLELE: non_effect_allele, Constants.ALT_ALLELE: effect_allele}
def get_rsid(snpid):
return {Constants.SNP: snpid}
def build_snp_dictionary(filepath, args, rsid_as_key = False):
import os
_open = gzip.open if filepath.endswith(".gz") else open
with _open(filepath) as d:
dictionary = {}
for i, line in enumerate(d):
comps = line.strip().split()
if i == 0:
snpid_idx = comps.index(args.snp_annot_snpid_column)
chr_idx = comps.index(args.snp_annot_chr_column)
pos_idx = comps.index(args.snp_annot_position_column)
continue
chromosome = comps[chr_idx]
position = comps[pos_idx]
rsid = comps[snpid_idx]
if rsid_as_key:
dictionary[rsid] = (chromosome, position)
else:
dictionary[(chromosome, position)] = rsid
return dictionary
def decide_snpid_parser(snpid_format):
if snpid_format == "rsid":
return 1, get_rsid
elif "(?P<chromosome>.*)" in snpid_format and "(?P<position>.*)" in snpid_format:
if "(?P<effect_allele>.*)" in snpid_format and "(?P<non_effect_allele>.*)" in snpid_format:
return 3, extract_chr_pos_ea_nea
else:
return 2, extract_chr_and_pos
else:
exit("SNP ID format not supported.")
# Estimate the number of lines from the file size and the number of bytes of one line.
def total_n_of_snps_in_eQTL(filename):
if filename.endswith("gz"):
# if the file is gzipped we cannot do a reasonable estimation of the number of lines
return None
else:
with open(filename) as eqtl:
eqtl.readline() # skip header #TODO: consider headers with several lines
first_line = eqtl.readline()
filesize = os.path.getsize(filename)
est_n_lines = filesize / len(first_line)
est_n_lines = 100000 * (est_n_lines / 100000)
return est_n_lines
def show_progress(progress, current_line, total_lines):
if total_lines is not None: # if we have an estimation for the number of lines
k = int(100*current_line/total_lines)
if progress != k:
progress = k
print("{}\% completed ({}/~{})".format(k, current_line, total_lines))
return progress
else:
pass
def get_df_colnames():
df_colnames = [Constants.CHROMOSOME, \
Constants.POSITION, \
Constants.REF_ALLELE, \
Constants.ALT_ALLELE, \
Constants.SNP, \
Constants.VAR_ID, \
Constants.GENENAME, \
Constants.BETA, \
Constants.PVALUE]
return df_colnames
# get the relevant information from the rows of a eQTL file from the GTEx consortium
# the SNP ID for these files is chr_pos_alt_ref_build, e.g. 1_123456789_A_G_b37
# ignore_if_not_in_snp_annot == True: ignore rows with SNPs absent from dictionary***
def process_row(comps, pval_thr, indexes, \
snp_dictionary, \
snpid_parser, \
info_from_snp_id, \
ignore_if_not_in_snp_annot = True, \
ignore_indels = True, snpid_regex=None):
pvalue = comps[indexes[Constants.PVALUE]]
if float(pvalue) > pval_thr:
return None
snp_id = comps[indexes[Constants.SNP]]
# extract info from snp ID.
if info_from_snp_id == 1: # rsid
info_from_snp = snpid_parser(snp_id)
snp_id = info_from_snp[Constants.SNP]
if snp_id not in snp_dictionary and ignore_if_not_in_snp_annot:
return None
ref_allele = comps[indexes[Constants.REF_ALLELE]]
alt_allele = comps[indexes[Constants.ALT_ALLELE]]
var_id = snp_id
chromosome, position = snp_dictionary[var_id]
elif info_from_snp_id == 2: # chr, pos
info_from_snp = snpid_parser(snp_id, snpid_regex)
chromosome, position = info_from_snp[Constants.CHROMOSOME], info_from_snp[Constants.POSITION]
ref_allele = comps[indexes[Constants.REF_ALLELE]]
alt_allele = comps[indexes[Constants.ALT_ALLELE]]
key = (chromosome, position)
if key not in snp_dictionary and ignore_if_not_in_snp_annot:
return None
var_id = snp_dictionary[key]
elif info_from_snp_id == 3: # chr, pos, ref, alt
info_from_snp = snpid_parser(snp_id, snpid_regex)
chromosome, position, ref_allele, alt_allele = \
info_from_snp[Constants.CHROMOSOME], \
info_from_snp[Constants.POSITION], \
info_from_snp[Constants.REF_ALLELE], \
info_from_snp[Constants.ALT_ALLELE]
key = (chromosome, position)
if key not in snp_dictionary and ignore_if_not_in_snp_annot:
return None
snp_id = snp_dictionary[key]
var_id = snp_id
# if we are ignoring indels and the variant happens to be one...
if ignore_indels and (ref_allele not in Constants.alleles or alt_allele not in Constants.alleles):
return None
genename = comps[indexes[Constants.GENENAME]]
beta = comps[indexes[Constants.BETA]]
row = [chromosome, position, ref_allele, alt_allele, var_id, \
snp_id, genename, beta, pvalue]
return row
def include_gene_(gene, white_list, black_list):
white_list_condition = (white_list is None or gene in white_list)
black_list_condition = (black_list is not None and gene in black_list)
include_gene = white_list_condition and not black_list_condition
return include_gene
# FIXME: function is too long
def read_eQTL_file(eqtl_file, params, snpid_regex=None):
found = set()
gene_column = params.gene_column # sentinel_column
data = pandas.DataFrame(columns=get_df_colnames())
if params.pval_threshold is None:
params.pval_threshold = 1
else:
params.pval_threshold = float(params.pval_threshold)
# estimate number of SNPs in QTL file, in order to display progress of model generation
total_snps = total_n_of_snps_in_eQTL(eqtl_file)
total_for_gene = 0
passed_for_gene = 0
number_of_genes_passed = 0
_open = gzip.open if eqtl_file.endswith("gz") else open
with _open(eqtl_file) as ifile:
sentinel = None
buf = []
progress = 0
number_of_genes_passed = 0
for i, line in enumerate(ifile):
comps = line.strip().split()
if i == 0: # HEADER # TODO: consider the possibility of headers with multiple lines
# header = ifile.readline().strip().split()
Utilities.prevalidate_format(params, comps)
indexes = get_col_indexes(comps, params)
gene_index = indexes[Constants.GENENAME]
continue
# Decide how to treat SNP ID (it depends on the info it contains)
if i == 1: # FIRST LINE OF DATA
snpid = comps[indexes[Constants.SNP]]
snpid_type, snpid_parser = decide_snpid_parser(params.snpid_format)
if snpid_type == 1:
snp_dictionary = build_snp_dictionary(params.snp_annot_file, params, rsid_as_key=True)
elif snpid_type in (2,3):
snp_dictionary = build_snp_dictionary(params.snp_annot_file, params, rsid_as_key=False)
Utilities.validate_input_parameters(params, snpid_type)
progress = show_progress(progress, i, total_snps)
row_sentinel = comps[gene_index]
if sentinel is None:
sentinel = row_sentinel
include_gene = include_gene_(sentinel, params.gene_white_list, params.gene_black_list)
# if the current row has still the same gene as the previous
if row_sentinel == sentinel:
if include_gene:
total_for_gene += 1
new_row = process_row(comps, params.pval_threshold, indexes, \
snp_dictionary, snpid_parser, \
snpid_type, params.ignore_if_not_in_snp_annot, \
params.ignore_indels, snpid_regex)
if new_row is not None:
passed_for_gene += 1
buf.append(new_row)
continue
else: # if the gene just changed, we have to decide whether we have to include it or not
include_gene = include_gene_(sentinel, params.gene_white_list, params.gene_black_list)
if include_gene:
passed_for_gene += 1
# if we're here, this is the last line for the current gene (because the gene just changed),
# therefore we write the data into a pandas dataframe
# logging.info("Done with gene {} ({}/{} SNPs passed)".format(sentinel, passed_for_gene, total_for_gene))
data = to_dataframe(buf, get_df_colnames(), to_numeric="ignore")
yield data
passed_for_gene = 0
total_for_gene = 0
if params.gene_white_list is not None:
found.add(sentinel)
if len(found) == len(params.gene_white_list):
logging.info("Found everything in the whitelist")
return
new_row = process_row(comps, params.pval_threshold, indexes, \
snp_dictionary, snpid_parser, \
snpid_type, params.ignore_if_not_in_snp_annot, \
params.ignore_indels, snpid_regex)
buf = [new_row] if new_row else []
sentinel = comps[gene_index]
# If we reached the end of the file and buf still contains something...
if len(buf):
data = to_dataframe(buf, get_df_colnames(), to_numeric="ignore")
yield data