forked from hakyimlab/QTL_to_PredictDB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Covariance.py
executable file
·352 lines (278 loc) · 13.5 KB
/
Covariance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
#!/usr/bin/env python
__author__ = 'heroico'
# Import metascan as a directory
# These are adapted for python 3
import logging
import numpy
import os
import gzip
import ntpath
from metax import WeightDBUtilities
from metax import PrediXcanFormatUtilities
from metax import ThousandGenomesUtilities
from metax import Logging
from metax import Utilities
from metax import Formats
def pathLeaf(path):
head, tail = ntpath.split(path)
return tail or ntpath.basename(head)
class ProcessWeightDB(object):
def __init__(self, args):
self.weight_db = pathLeaf(args.weight_db)
self.db_path = args.weight_db
self.data_folder = args.input_folder
self.correlation_output = args.correlation_output
self.covariance_output = args.covariance_output
if args.covariance_output is None:
comp = os.path.splitext(self.weight_db)[0]
name = comp + ".cov.txt.gz"
path = os.path.join("intermediate", "cov")
path = os.path.join(path, name)
self.covariance_output = path
self.input_format = args.input_format
self.found_genes_for_covariance = {}
self.found_genes_for_correlation = {}
self.min_maf_filter = float(args.min_maf_filter) if args.min_maf_filter else None
self.max_maf_filter = float(args.max_maf_filter) if args.max_maf_filter else None
self.max_snps_in_gene = int(args.max_snps_in_gene) if args.max_snps_in_gene else None
def run(self):
if not self.correlation_output and not self.covariance_output:
logging.info("Provide --correlation_output or --covariance_output or both")
return
logging.info("Loading Weights")
weight_db_logic = WeightDBUtilities.WeightDBEntryLogic(self.db_path)
logging.info("Building files")
self.buildFiles(weight_db_logic)
logging.info("Ran successfully")
def buildFiles(self, weight_db_logic):
do_correlations = self.correlation_output is not None
if do_correlations:
if os.path.exists(self.correlation_output):
logging.info("%s already exists, delete it if you want it figured out again", self.correlation_output)
do_correlations = False
else:
correlation_dir = os.path.dirname(self.correlation_output)
if not os.path.exists(correlation_dir):
os.makedirs(correlation_dir)
self.writeFileHeader(self.correlation_output)
do_covariances = self.covariance_output is not None
if do_covariances:
if os.path.exists(self.covariance_output):
logging.info("%s already exists, delete it if you want it figured out again", self.covariance_output)
do_covariances = False
else:
covariance_dir = os.path.dirname(self.covariance_output)
if not os.path.exists(covariance_dir):
os.makedirs(covariance_dir)
self.writeFileHeader(self.covariance_output)
if not do_covariances and not do_correlations:
return
names = Utilities.dosageNamesFromFolder(self.data_folder)
for name in names:
snps, snps_by_rsid = self.getSNPS(name, weight_db_logic)
if do_correlations:
self.addToCorrelationFile(weight_db_logic, name, snps, snps_by_rsid)
if do_covariances:
self.addToCovarianceFile(weight_db_logic, name, snps, snps_by_rsid)
def writeFileHeader(self,path):
with gzip.open(path, "wt") as file:
file.write("GENE RSID1 RSID2 VALUE\n")
def getSNPS(self, name, weight_db_logic):
dosageLoader = None
if self.input_format == Formats.IMPUTE:
dosageLoader = ThousandGenomesUtilities.IMPUTEDosageLoader(self.data_folder, name) #outdated code
elif self.input_format == Formats.PrediXcan:
dosageName = Utilities.dosageName(name)
path = os.path.join(self.data_folder, dosageName)
dosageLoader = PrediXcanFormatUtilities.PrediXcanFormatDosageLoader(path, weight_db_logic)
else:
logging.info("Invalid input format: %s", self.input_format)
return
snps, snps_by_rsid = dosageLoader.load()
return snps, snps_by_rsid
def addToCovarianceFile(self, weight_db_logic, name, snps, snps_by_rsid):
logging.info("Adding to covariance for %s-%s", name, self.weight_db)
genes = list(weight_db_logic.weights_by_gene.keys())
total_genes = len(genes)
last_reported_percent = 0
processed = 0
for gene in genes:
processed += 1
percent = int(processed*100.0 / total_genes)
if percent == last_reported_percent+10:
logging.info("%d percent genes processed", percent)
last_reported_percent = percent
entries = self.buildCovarianceEntries(name, gene, weight_db_logic, snps_by_rsid)
if len(entries) == 0:
logging.log(6,"Gene %s has no snps in current file", gene)
continue
self.addToFile(self.covariance_output, gene, entries)
def addToFile(self, path, gene, entries):
with gzip.open(path, "wt") as file:
for entry in entries:
line = " ".join([gene, entry[0], entry[1], entry[2]])+"\n"
file.write(line)
def buildCovarianceEntries(self, name, gene, weight_db_logic, snps_by_rsid):
weights_in_gene = weight_db_logic.weights_by_gene[gene]
rsids_from_genes = list(weights_in_gene.keys())
#gather as much data as we can work on
related_rsids, related_data = self.buildRelatedData(rsids_from_genes, snps_by_rsid, weights_in_gene)
if len(related_rsids) == 0:
return []
self.updateFoundCovariance(gene, name)
#covariance matrix of related SNP's data
array = numpy.array(related_data)
cov = numpy.cov(array)
#translate into sql entries
entries = self.buildMatrixOutputEntries(cov, rsids_from_genes, related_rsids, snps_by_rsid)
if not len(entries):
raise NameError("Couldn not build covariance entries for (%s,%s)" %(name,gene))
return entries
def updateFoundCovariance(self, gene, name):
found = None
if gene in self.found_genes_for_covariance:
found = self.found_genes_for_covariance[gene]
logging.info("Gene %s found again for %s", gene, name)
else:
found = []
self.found_genes_for_covariance[gene] = found
found.append(name)
def buildRelatedData(self, rsids_from_genes, snps_by_rsid, weights_in_gene):
related_rsids = []
related_data = []
l = len(rsids_from_genes)
if self.max_snps_in_gene and l > self.max_snps_in_gene:
logging.info("Skipping covariance too large: %d", l)
return related_data, related_rsids
for rsid in rsids_from_genes:
if not rsid in snps_by_rsid:
logging.log(5, "related rsid %s not present in genotype data", rsid)
continue
related_snp = snps_by_rsid[rsid]
freq = sum(related_snp.data)*1.0/(2*len(related_snp.data))
if self.min_maf_filter and self.min_maf_filter > freq:
logging.log(6, "related rsid %s below min maf: %s", rsid, freq)
continue
if self.max_maf_filter and self.max_maf_filter < freq:
logging.log(6, "related rsid %s above max maf: %s", rsid, freq)
continue
data = related_snp.data
weight = weights_in_gene[rsid]
if weight.ref_allele == related_snp.eff_allele and\
weight.eff_allele == related_snp.ref_allele:
logging.log(7, "related rsid %s has alleles flipped compared to model, transforming dosage", rsid)
data = [2-x for x in data]
related_data.append(data)
related_rsids.append(rsid)
return related_rsids, related_data
def buildMatrixOutputEntries(self, matrix, rsids_from_genes, related_rsids, snps_by_rsid):
entries = []
#special case: we might have a single rsid!
if matrix.ndim == 0:
c = str(float(matrix))
r = rsids_from_genes[0]
entries.append((r,r,c))
return entries
for i in range(0, len(rsids_from_genes)):
rsid_i = rsids_from_genes[i]
related_i = -1
if rsid_i in related_rsids:
related_i = related_rsids.index(rsid_i)
for j in range(0, len(rsids_from_genes)):
rsid_j = rsids_from_genes[j]
related_j = -1
if rsid_j in related_rsids:
related_j = related_rsids.index(rsid_j)
value = "NA"
if related_i > -1 and related_j > -1:
value = str(matrix[related_i][related_j])
if i == j:
entries.append((rsid_i, rsid_i, value))
else:
if value == "NA":
if rsid_i < rsid_j:
entries.append((rsid_i, rsid_j, value))
else:
snp_i = snps_by_rsid[rsid_i]
snp_j = snps_by_rsid[rsid_j]
if snp_i.position < snp_j.position:
entries.append((rsid_i, rsid_j, value))
return entries
def addToCorrelationFile(self, weight_db_logic, name, snps, snps_by_rsid):
logging.info("Building correlation database for %s-%s", name, self.weight_db)
genes = list(weight_db_logic.weights_by_gene.keys())
total_genes = len(genes)
last_reported_percent = 0
processed = 0
for gene in genes:
processed += 1
percent = int(processed*100.0 / total_genes)
if percent == last_reported_percent+10:
logging.info("%d percent genes processed", percent)
last_reported_percent = percent
entries = self.buildCorrelationEntries(name, gene, weight_db_logic, snps_by_rsid)
if len(entries) == 0:
logging.log(6,"Gene %s has no snps in current file", gene)
continue
self.addToFile(self.correlation_output, gene, entries)
def buildCorrelationEntries(self, name, gene, weight_db_logic, snps_by_rsid):
weights_in_gene = weight_db_logic.weights_by_gene[gene]
rsids_from_genes = list(weights_in_gene.keys())
#gather as much data as we can work on
related_rsids, related_data = self.buildRelatedData(rsids_from_genes, snps_by_rsid, weights_in_gene)
if len(related_rsids) == 0:
return []
self.updateFoundCorrelation(gene, name)
#correlation matrix of related SNP's data
array = numpy.array(related_data)
cor = numpy.corrcoef(array)
#translate into sql entries
entries = self.buildMatrixOutputEntries(cor, rsids_from_genes, related_rsids, snps_by_rsid)
if not len(entries):
raise NameError("Couldn not build correlation entries for (%s,%s)" %(name,gene))
return entries
def updateFoundCorrelation(self, gene, name):
found = None
if gene in self.found_genes_for_correlation:
found = self.found_genes_for_correlation[gene]
logging.info("Gene %s found again for %s", gene, name)
else:
found = []
self.found_genes_for_correlation[gene] = found
found.append(name)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Build correlations and/or covariances from PHASE3 data and weights database.')
parser.add_argument("--verbosity",
help="Log verbosity level. 1 is everything being logged. 10 is only high level messages, above 10 will hardly log anything",
default = "10")
parser.add_argument("--weight_db",
help="name of weight db in data folder",
default="data/DGN-WB_0.5.db")
parser.add_argument("--input_folder",
help="name of folder containing PHASE 3 data",
default="intermediate/TGF_EUR")
parser.add_argument("--correlation_output",
help="Name of file to dump correlation results in.",
default=None)
#default="intermediate/1000GP_Phase3_chr_cor_dgnwb_cor.txt.gz")
parser.add_argument("--covariance_output",
help="Name of file to dump covariance results in. Defaults to 'intermediate/cov/' + file name prefix from '--weight_db' argument",
default=None)
parser.add_argument('--input_format',
help='Input dosage files format. Valid options are: IMPUTE, PrediXcan',
default=Formats.PrediXcan)
parser.add_argument('--min_maf_filter',
help="Filter snps according to this maf",
default=None)
parser.add_argument('--max_maf_filter',
help="Filter snps according to this maf",
default=None)
parser.add_argument("--max_snps_in_gene",
help="Ignore any gene that has snps above this value",
type=int,
default=None)
args = parser.parse_args()
Logging.configureLogging(int(args.verbosity))
work = ProcessWeightDB(args)
work.run()