-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfold_utils.py
260 lines (240 loc) · 9.64 KB
/
fold_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
import subprocess, os, sys, settings
import re, string, random, itertools
import numpy as np
import warnings
paramfile = 'rna'
def vienna_fold(sequence, constraint=None, bpp=False):
"""
folds sequence using Vienna
args:
sequence is the sequence string
returns:
secondary structure
"""
filename = '%s/%s' % (settings.TEMP_DIR, ''.join(random.sample(string.lowercase,5)))
with open('%s.fa' % filename, 'w') as f:
f.write('>%s\n' % os.path.basename(filename))
f.write('%s\n' % sequence)
if constraint:
options = ' -C'
f.write('%s\n' % constraint)
else:
options = ''
if bpp:
options += ' -p'
else:
options += ' --noPS'
if '&' in sequence:
if sequence.count('&') > 1:
print 'Cannot handle more than 2 strands with Vienna - try the nupack option'
sys.exit()
command = 'RNAcofold'
else:
command = 'RNAfold'
output = subprocess.check_output(os.path.join(settings.VIENNA_DIR,command) + options + ' -T 37.0 < %s' % filename + '.fa', shell=True)
# parse the result
toks = re.search('([AUGC]+)\s*([\.\)\(&]+)\s+\(\s*([-0-9\.]+)\s*\)', output)
if bpp:
bpp_matrix = np.zeros((len(sequence), len(sequence)))
# get info from output file
psfile = '%s_dp.ps' % os.path.basename(filename)
if os.path.isfile(psfile):
with open(psfile) as f:
ps = f.read()
# create bpp matrix from file
lines = re.findall('(\d+)\s+(\d+)\s+(\d*\.*\d*)\s+ubox',ps)
for ii in range(0,len(lines)):
bpp_matrix[int(lines[ii][0])-1, int(lines[ii][1])-1] = float(lines[ii][2])
else:
warnings.warn('dotplot file %s_dp.ps not found' % filename)
os.system('rm %s*' % filename)
os.system('rm %s*' % os.path.basename(filename))
return [toks.group(2), float(toks.group(3)), np.array(bpp_matrix)]
os.system('rm %s*' % filename)
return [toks.group(2), float(toks.group(3))]
def get_orderings(n):
"""
get all possible orderings including last strand
"""
all = []
# loop over number of strands
for i in range(1,n):
# loop over each possible combination
for order in list(itertools.combinations(range(1,n), i)):
# add last strand at each possible position
for j in range(1,i+1):
order_list = list(order)
order_list.insert(j,n)
all.append(order_list)
return all
def nupack_fold(sequence, oligo_conc=1, bpp=False):
"""
finds most prevalent structure using nupack partition function
"""
if '&' in sequence:
return nupack_fold_multi(sequence, oligo_conc, bpp)
else:
return nupack_fold_single(sequence, bpp)
def nupack_fold_single(sequence, bpp=False):
"""
finds most prevalent structure using nupack partition function for single strand
"""
rand_string = ''.join(random.choice(string.ascii_lowercase) for _ in range(6))
filename = '%s/%s' % (settings.TEMP_DIR, rand_string)
options = ['-material', paramfile]
with open('%s.in' % filename, 'w') as f:
f.write('%s\n' % sequence)
p = subprocess.Popen([os.path.join(settings.NUPACK_DIR,'mfe')] + options + [filename], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
print stdout
print stderr
raise ValueError('mfe command failed for %s' % rand_string)
result = ['.'*len(sequence), 0, [1]]
with open('%s.mfe' % filename) as f:
line = f.readline()
while line:
if not line.startswith('%') and line.strip():
energy = float(f.readline().strip())
secstruct = f.readline().strip()
result = [secstruct, energy, [1]]
break
line = f.readline()
if bpp:
p = subprocess.Popen([os.path.join(settings.NUPACK_DIR,'pairs')] + options + [filename], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
bpp_matrix = []
if p.returncode != 0:
print stdout
print stderr
raise ValueError('mfe command failed for %s' % rand_string)
with open('%s.ppairs' % filename) as f:
line = f.readline()
while line:
if not line.startswith('%') and line.strip():
bpp_matrix = nupack_read_bpp(f, len(sequence))
line = f.readline()
result.append(bpp_matrix)
os.system('rm %s*' % filename)
return result
def nupack_fold_multi(sequence, oligo_conc=1, bpp=False):
"""
finds most prevalent structure using nupack partition function for multistrand
"""
rand_string = ''.join(random.choice(string.ascii_lowercase) for _ in range(6))
filename = '%s/%s' % (settings.TEMP_DIR, rand_string)
split = sequence.split('&')
with open('%s.in' % filename, 'w') as f:
f.write('%s\n' % len(split))
for s in split:
f.write('%s\n' % s)
f.write('1\n')
orderings = get_orderings(len(split))
with open('%s.list' % filename, 'w') as f:
for ordering in orderings:
f.write('%s\n' % ' '.join([str(x) for x in ordering]))
with open('%s.con' % filename, 'w') as f:
if isinstance(oligo_conc, list):
assert len(split) == len(oligo_conc) + 1, \
'length of concentrations must be one less than number of strands'
f.write('%s\n' % '\n'.join([str(x) for x in oligo_conc]))
else:
f.write('%s\n' % oligo_conc * (len(split)-1))
f.write('1e-9\n')
options = ['-material', paramfile, '-ordered', '-mfe']#, '-quiet']
if bpp:
options.append('-pairs')
p = subprocess.Popen([os.path.join(settings.NUPACK_DIR,'complexes')] + options + [filename], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
print stdout
print stderr
raise ValueError('complexes command failed for %s' % rand_string)
p = subprocess.Popen([os.path.join(settings.NUPACK_DIR,'concentrations'), '-ordered', filename], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
print stdout
print stderr
raise ValueError('concentrations command failed for %s' % rand_string)
# get mfe
complex = False
with open('%s.eq' % filename) as f_eq:
for line in f_eq:
if not line.startswith('%'):
complex = line.strip().split()
if float(complex[len(split)+1]):
break
if not complex:
os.system('rm %s*' % filename)
if bpp:
return ['.'*len(sequence), 0, range(1,len(split)+1), []]
return ['.'*len(sequence), 0, range(1,len(split)+1)]
# get strand ordering
with open('%s.ocx-key' % filename) as f_key:
for line in f_key:
if line.startswith('%s\t%s' % (complex[0], complex[1])):
strands = [int(x) for x in line.strip().split()[2:]]
break
# get mfe structure
with open('%s.ocx-mfe' % filename) as f_mfe:
line = f_mfe.readline()
while line:
if line.startswith('%% complex%s-order%s' % (complex[0], complex[1])):
f_mfe.readline()
energy = f_mfe.readline().strip()
secstruct = f_mfe.readline().strip()
break
line = f_mfe.readline()
# get full secondary structure
for i in range(len(split)):
if i+1 not in strands:
secstruct += '&' + '.'*len(split[i])
strands.append(i+1)
if bpp:
bpp_matrix = []
with open('%s.cx-epairs' % filename) as f_pairs:
line = f_pairs.readline()
while line:
if line.startswith('%% complex%s' % complex[0]):
bpp_matrix = nupack_read_bpp(f_pairs, len(sequence))
break
line = f_pairs.readline()
os.system('rm %s*' % filename)
return [secstruct.replace('+', '&'), float(energy), strands, bpp_matrix]
os.system('rm %s*' % filename)
return [secstruct.replace('+', '&'), float(energy), strands]
def nupack_read_bpp(f, n):
"""
read a base pair probability matrix from a nupack output file
"""
bpp_matrix = np.zeros((n,n))
f.readline()
line = f.readline()
while line and not line.startswith('%'):
bp = line.strip().split()
if int(bp[1])-1 != n:
bpp_matrix[int(bp[0])-1, int(bp[1])-1] = float(bp[2])
line = f.readline()
return bpp_matrix
def nupack_energy(sequence, secstruct):
rand_string = ''.join(random.choice(string.ascii_lowercase) for _ in range(6))
filename = '%s/%s' % (settings.TEMP_DIR, rand_string)
multi = '&' in sequence
split = sequence.split('&')
with open('%s.in' % filename, 'w') as f:
if multi:
f.write('%s\n' % len(split))
for sequence in split:
f.write('%s\n' % sequence)
if multi:
f.write('%s\n' % ' '.join([str(i) for i in secstruct[1]]))
f.write(secstruct[0].replace('&', '+'))
else:
f.write(secstruct.replace('&', '+'))
if '&' in sequence:
options = ['multi']
else:
options = []
result = subprocess.check_output([os.path.join(settings.NUPACK_DIR,'energy')] + options + [filename])
os.system('rm %s*' % filename)
return float(result.strip().split('\n')[-1])