-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjoey_utils.py
3804 lines (2990 loc) · 119 KB
/
joey_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
################################################################################
# General text functions
def str2bool(v):
""" Converts a number of potential string inputs to boolean """
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise ValueError('Boolean value expected.')
def str2int(v, warn=True):
"""
Attempts to convert an input to an integer, returning the original input if
conversion is not feasible
"""
import warnings
try:
return int(v)
except:
if warn:
warnings.warn("Could not convert to an int:\n{}".format(v))
return v
def str2float(v, warn=True):
"""
Attempts to convert an input to a float, returning the original input if
conversion is not feasible
"""
import warnings
try:
return float(v)
except:
if warn:
warnings.warn("Could not convert to a float:\n{}".format(v))
return v
def wrap_string(string, width=60, display=True):
"""
Prints out a string as a set of lines with predefined width
"""
# Determine line breaks
breaks = range(0,len(string), width)
lines = []
# Collect lines
for n, i in enumerate(breaks[:-1]):
lines.append(string[i:breaks[n+1]])
# Edge case to display the last line
if len(string) not in breaks:
lines.append(string[breaks[-1]:])
if display:
for l in lines:
print(l)
return lines
def find_index(query, match):
"""
Find all instances of match in a query string or list and return their
indices.
"""
return [n for n, i in enumerate(query) if i == match]
def extract_digits(string, take_digits=True, split_string=False):
"""
Given a string that includes letters and numbers, will isolate the letters
or numbers as specified by the take_digits argument. If True, all numbers \
will be returned. If False, all non-number characters will be returned. If
the split_string argument is True, will return a list of integers/strings
separated where the string had spaces. Otherwise, will return a single
integer/string.
"""
# Digits cases
if take_digits:
# List of integers
if split_string:
return [str2int(x) for x in ''.join(filter(
lambda i: i.isdigit() or i.isspace(),
string)).split()]
# Single integer
else:
return str2int(''.join(filter(lambda i: i.isdigit(), string)))
# Non-digits cases
else:
# List of strings without digits
if split_string:
return ''.join(filter(lambda i: not i.isdigit(), string)).split()
# Single string without digits
else:
return ''.join(filter(lambda i: not i.isdigit(), string))
def split_string_at_numbers(string):
"""
Given a string with numbers and letters, returns a list split not at spaces,
but rather wherever a string of digits starts or ends. Numbers are converted
to integers.
"""
# Initialize output string
out_str = ''
# Determine first character
is_digit = string[0].isdigit()
# Add each character in input to output, inserting breaks between letters
# and numbers
for char in string:
if is_digit == True:
# Previous and current characters are both digits
if char.isdigit():
out_str += char
# Previous character was digit, current character is not
else:
out_str += '~insert_break_here~'
is_digit = False
out_str += char
else:
# Previous character was not a digit, current character is
if char.isdigit():
out_str += '~insert_break_here~'
is_digit = True
out_str += char
# Previous and current characters are both not digits
else:
out_str += char
return [str2int(i) for i in out_str.split('~insert_break_here~')]
################################################################################
# Protein-specific text functions
def get_aa1_list(mode='A'):
"""
Give a list of the 20 canonical amino acids in 1-letter form
Mode determines the order.
A: alphabetical
C: by conservative AA type cluster (see is_conservative)
"""
# Confirm acceptable mode choice
mode = mode.upper()
if mode not in ['A', 'C']:
raise ValueError('Mode options are "A" (alphabetical) or "C" (cluster)')
if mode == 'A':
return ['A', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'K', 'L',
'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'V', 'W', 'Y']
if mode == 'C':
return ['A', 'F', 'I', 'L', 'M', 'V', 'W', 'Y',
'N', 'Q', 'S', 'T',
'H', 'K', 'R',
'D', 'E',
'C', 'G', 'P']
def get_aa3_list(mode='A'):
"""
Give a list of the 20 canonical amino acids in 3-letter form
Mode determines the order.
A: alphabetical
C: by conservative AA type cluster (see is_conservative)
"""
# Confirm acceptable mode choice
mode = mode.upper()
if mode not in ['A', 'C']:
raise ValueError('Mode options are "A" (alphabetical) or "C" (cluster)')
if mode == 'A':
return ['ALA', 'CYS', 'ASP', 'GLU', 'PHE',
'GLY', 'HIS', 'ILE', 'LYS', 'LEU',
'MET', 'ASN', 'PRO', 'GLN', 'ARG',
'SER', 'THR', 'VAL', 'TRP', 'TYR']
if mode == 'C':
return ['ALA', 'PHE', 'ILE', 'LEU', 'MET', 'VAL', 'TRP', 'TYR',
'ANS', 'GLN', 'SER', 'THR',
'HIS', 'LYS', 'ARG',
'ASP', 'GLU',
'CYS', 'GLY', 'PRO']
def get_aa_dict():
"""
Give a dict of the 20 canonical amino acids with keys in 3-letter form and
values in 1-letter form.
"""
return {'ALA': 'A', 'CYS': 'C', 'ASP': 'D', 'GLU': 'E', 'PHE': 'F',
'GLY': 'G', 'HIS': 'H', 'ILE': 'I', 'LYS': 'K', 'LEU': 'L', 'MET': 'M',
'ASN': 'N', 'PRO': 'P', 'GLN': 'Q', 'ARG': 'R', 'SER': 'S', 'THR': 'T',
'VAL': 'V', 'TRP': 'W', 'TYR': 'Y'}
def convert_aa_name(aa, return_orig=False):
"""
Takes an amino acid name and converts it from 1-letter to 3-letter or vice
versa, depending on input. Only works with 20 canonical amino acids.
The return_orig option sets the response to bad input. If it is true and the
input is not a 1-letter or 3-letter amino acid name, the original input will
be returned. If it is false, an error will be raised if the length is not 1
or 3, or if the string is not in its respective amino acid list.
"""
# Get sequence dict
aa_dict = get_aa_dict()
# If AA is 1-letter or 3-letter length, convert
aa = aa.upper()
if len(aa) in [1, 3]:
# If length is 1 instead of 3, invert the aa_dict
if len(aa) == 1:
aa_dict = invert_dictionary(aa_dict)
# If AA is a canonical AA, convert the name
if aa in aa_dict:
return aa_dict[aa]
# If not, either return input or raise an error, as specified
elif return_orig:
return aa
else:
raise KeyError(aa)
# If the length of input is not 1 or 3, either return input or raise an
# error, as specified
else:
if return_orig:
return aa
else:
raise ValueError(
'Amino acids must be either 1-letter or 3-letter strings')
def random_aa(seq_length, exclude_C=False):
"""
Returns a string of random 1-letter amino acid names from the cannonical
20, to a specified length. exclude_C allows cysteines to be excluded as
a candidate amino acid.
"""
from random import randint
aa_list = get_aa1_list()
# Exclude cysteine
if exclude_C:
aa_list.remove('C')
aa_string = ""
for aa in range(seq_length):
rand_index = randint(1, len(aa_list)) - 1
aa_string += aa_list[rand_index]
return aa_string
def aa_to_yeast_codon(aa_name):
"""
Given a 1-letter amino acid name, returns the preferred yeast codon for that
amino acid.
"""
yeast_codons={'A':'GCT', 'C': 'TGC', 'D': 'GAC', 'E': 'GAG', 'F': 'TTT',
'G': 'GGA', 'H': 'CAC', 'I': 'ATC', 'K': 'AAA', 'L': 'CTT',
'M': 'ATG', 'N': 'AAT', 'P': 'CCA', 'Q': 'CAG', 'R': 'CGT',
'S': 'TCA', 'T': 'ACG', 'V': 'GTA', 'W': 'TGG', 'Y': 'TAC'}
return yeast_codons[aa_name.upper()]
def make_dna_from_peptide_sequence(peptide_sequence):
"""
Given a peptide sequence string using canonical amino acids, returns a yeast
DNA string encoding it.
"""
dna_seq = ''
for aa in peptide_sequence:
dna_seq += aa_to_yeast_codon(aa)
return dna_seq
def is_conservative(wt_aa, mut_aa):
"""
Input two single-letter strings representing wild type and mutant AAs.
Returns a Boolean of whether a change from the first residue to the second
represents a conservative substitution.
Uses groupings of amino acids from Wikipedia:
https://en.wikipedia.org/wiki/Amino_acid#/media/File:Amino_Acids.svg
Staying within B, or D will be conservative.
Staying withing charge group in A will be conservative.
Any special case is nonconservative.
"""
res_groups = [ ['A', 'F', 'I', 'L', 'M', 'V', 'W', 'Y'],
['C'],
['D', 'E'],
['G'],
['H', 'K', 'R'],
['N', 'Q', 'S', 'T'],
['P'],
['U']]
# Identify in which group the original residue belongs
for rg in res_groups:
if wt_aa in rg:
original_group = rg
break
# Check whether the substituted residue is in the same group
conservation = mut_aa in original_group
return conservation
################################################################################
# List manipulation functions
def join_list(item_list, joiner=''):
"""
Convert a list into a string of items. By default, there will be nothing
between joined items, though different joiner strings can be used. List
does not need to be converted to strings first.
"""
return joiner.join([str(i) for i in item_list])
def clean_list(in_list, to_type=None, nonredundant=True, sort=True):
"""
Given an input list, returns a cleaned list.
- If given a type (str, int, float), will attempt to make all
elements into that type, leaving un-convertable elements unchanged
- Removes redundancies (true by default)
- Sorts (true by default)
"""
# Catch improper inputs for object type
if to_type not in [None, str, int, float]:
err_text = 'to_type must be str, int, float'
raise ValueError(err_text)
# Create modified list
out_list = in_list[:]
# Cast type
if to_type == str:
out_list = [str(i) for i in out_list]
if to_type == int:
out_list = [str2int(i) for i in out_list]
if to_type == float:
out_list = [str2float(i) for i in out_list]
# Remove redundancies from list
if nonredundant:
out_list = list(set(out_list))
# Sort list
if sort:
out_list = sorted(out_list)
return out_list
def partition_list(in_list, partitions, member):
"""
Given a list, a number of partitions, and a partition member number,
splits the list into the appropriate number of partitions, and returns
the specified member. Member should be 1-indexed, so the minimum is 1 and
maximum is the number of partitions.
"""
# Confirm appropriate member number
assert 1 <= member <= partitions
# Determine list length and partition size
list_size = len(in_list)
partition_size = int(list_size/partitions)
overrun = list_size % partitions
# Determine starting index to collect. If the list doesn't break into equal
# partitions, the earlier partitions will have one extra element
start_index = (member - 1) * partition_size
if member > overrun:
start_index += overrun
else:
start_index += member - 1
# Determine end index to collect
end_index = start_index + partition_size
if member <= overrun:
end_index += 1
return in_list[start_index:end_index]
def flatten_nested_lists(in_list, sort=True, nonredundant=True):
"""
Takes a list of lists and flattens it into a single list. Can optionally
remove redundant entries (this is the default behavior).
"""
# Make flattened list
flat_list = [item for sublist in in_list for item in sublist]
# Sort the list if desired
if sort:
flat_list.sort()
# Remove redundancies
if nonredundant:
non_redundant_list = []
for item in flat_list:
if item not in non_redundant_list:
non_redundant_list.append(item)
return non_redundant_list
else:
return flat_list
def variable_sliding_window(inp, min_size=0, max_size=0):
"""
Takes a string or list input and returns a list of frames in that input. The
frame size increases with each iteration. Thus, running on an input of 'str'
with produce the output ['s', 't', 'r', 'st', 'tr', 'str']. Default behavior
will go from a frame size of 1 up to the full size of the input. Optionally,
this can be constrained to a set window size range.
"""
# Initialize output list
out_windows_list = []
# Set initial window size
if min_size:
window_size = min_size
else:
window_size = 1
# Set final window size
if max_size:
window_max = max_size
else:
window_max = len(inp) + 1
# Outer loop with increasing window size
while window_size <= window_max:
frame_start = 0
frame_end = frame_start + window_size
# Inner loop sliding the window
while frame_end <= len(inp):
# Add frame to output list
out_windows_list.append(inp[frame_start:frame_end])
# Increment start of frame and end of frame
frame_start += 1
frame_end = frame_start + window_size
# Increment window size
window_size += 1
return out_windows_list
def find_nearest_in_array(array, value, greater=False, lesser=False):
"""
Find the index of the closest value in an array to a query value. The
array does not need to be sorted. By default, selects either greater or
lesser, whichever is closest. greater or lesser can be set to true (not
both) to force one direction.
"""
import numpy as np
assert not (greater == True and lesser == True)
# Convert input into a numpy array (might be a list, etc)
array = np.asarray(array)
# Take differences
difs_array = array - value
# If seeking nearest greater, exclude negatives
if greater:
difs_array = np.where(difs_array > 0, difs_array, np.inf)
# If seeking nearest lesser, exclude positives
if lesser:
difs_array = np.where(difs_array < 0, difs_array, np.inf)
# If no nearest value is found, return None
if all(np.isinf(difs_array)):
return None
# Return the index of the nearest value
return (np.abs(difs_array)).argmin()
def invert_dictionary(dictionary):
"""
Takes a dict object and makes a copy where the values of the input are the
keys of the output and vice versa.
"""
return {v: k for k, v in dictionary.items()}
################################################################################
# Math functions
def ceilm(number, multiple):
"""
Returns a float rounded up by a factor of the multiple specified
"""
from math import ceil
return ceil(float(number) / multiple) * multiple
def floorm(number, multiple):
"""
Returns a float rounded down by a factor of the multiple specified
"""
from math import floor
return floor(float(number) / multiple) * multiple
def roundm(number, multiple):
"""
Returns a float rounded to a factor of the multiple specified
"""
return round(float(number) / multiple) * multiple
def opt_r2(data_column, gmm, min_bins=5, max_bins=1000):
"""
Given a 1-dimensional set of data and a gaussian model that fits it, cycles
through a range of bin counts (default 5-1000) to determine the bin count
that yields the best R2 value, and returns that R2 value and the bin count.
"""
import numpy as np
from sklearn.metrics import r2_score
best_i = min_bins
best_r2 = -10
for i in range(min_bins, max_bins):
x = np.linspace(data_column.min(), data_column.max(), i)
y1 = np.histogram(data_column, bins=i)[0]
y1_norm = np.linalg.norm(y1)
y2 = np.exp(gmm.score_samples(x.reshape(-1, 1)))
r2 = r2_score(y1/y1_norm, y2)
if r2 > best_r2:
best_i = i
best_r2 = r2
return best_r2, best_i
def gaussfit(x_array, y_array, components=1):
"""
Normal/gaussian mixtures fitting of data. Fits a model with a specified
number of gaussian components to the y_array data with range and resolution
specified by the x_array (which is usually a numpy.linspace). Returns the
model and a dataframe with the parameter values.
"""
import numpy as np
import pandas as pd
from sklearn.mixture import GaussianMixture
# Fit model
gmm = GaussianMixture(n_components=components).fit(X=y_array)
gmm_y = np.exp(gmm.score_samples(x_array.reshape(-1, 1)))
# Collect fit parameters
gmm_params = {}
for i in range(components):
gmm_params['μ' + str(i + 1)] = [np.reshape(gmm.means_, components)[i]]
gmm_params['σ' + str(i + 1)] = [np.reshape(gmm.covariances_, components)[i]]
gmm_params['wt' + str(i + 1)] = [np.reshape(gmm.weights_, components)[i]]
gmm_params['R2'] = [opt_r2(y_array, gmm)[0]]
gmm_params = pd.DataFrame(gmm_params)
return gmm_y, gmm_params
def sum_squares(array):
"""
Calculates the sum of squares for all values in an array
"""
return sum(i ** 2 for i in array)
def interquartile_bounds(array):
"""
For a given array, calculates the boundaries for outlier classification,
based on being more than 1.5 * (interquartile range) above or below the
quartiles. Returns lower and upper bounds.
"""
import numpy as np
# Determine quartiles
q1 = np.percentile(array, 25, interpolation='midpoint')
q3 = np.percentile(array, 75, interpolation='midpoint')
# Determine interquartile range
iqr = q3 - q1
# Determine bounds
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
return lower, upper
def calc_distance(c1, c2):
"""
Returns the distance between two XYZ coordinate vectors
"""
import numpy as np
return np.linalg.norm(np.array(c1) - np.array(c2))
def calc_angle(c1, c2, c3):
"""
Calculate the angle between three points, about the middle point, c2
Returns value in degrees
"""
import numpy as np
# Get triangle legs
l1 = np.array(c1) - np.array(c2)
l2 = np.array(c3) - np.array(c2)
# Determine angle
cosine_angle = np.dot(l1, l2) / (np.linalg.norm(l1) * np.linalg.norm(l2))
angle = np.arccos(cosine_angle)
return np.degrees(angle)
def calc_dihedral(c1, c2, c3, c4):
"""
Calculate the dihedral/torsion angle between four points, between c2 and c3
Returns value in degrees
"""
import numpy as np
# Get dihedral vectors
v1 = np.array(c1) - np.array(c2)
v2 = np.array(c3) - np.array(c2)
v3 = np.array(c4) - np.array(c3)
# Normalize v2 magnitude
v2 /= np.linalg.norm(v2)
# Vector Projections
## p1 = projection of v1 onto plane perpendicular to v2
## p2 = projection of v3 onto plane perpendicular to v2
p1 = v1 - np.dot(v1, v2)*v2
p2 = v3 - np.dot(v3, v2)*v2
# Determine torsion angle
x = np.dot(p1, p2)
y = np.dot(np.cross(v2, p1), p2)
return np.degrees(np.arctan2(y, x))
################################################################################
# General file reading and editing
def readfile(file_name):
""" Opens a file in read-mode and returns a list of the text lines """
with open(file_name, 'r') as r:
lines = r.readlines()
return lines
def is_zipped(fi):
"""
Check whether the end of the file extension is '.gz'. Returns a boolean.
"""
from os.path import splitext
return bool(splitext((fi))[-1] == '.gz')
def unzip_file(fi):
"""
Read in a gzipped file and save an unzipped version, stripping off .gz
from the file name ending, and returning that rename.
"""
import subprocess
if is_zipped(fi):
# Unzip the file
subprocess.call(['gunzip', fi])
# Remove .gz from filename
fi = fi.rstrip('.gz')
else:
print('Warning: File is not zipped: \n{}'.format(fi))
return fi
def zip_file(fi):
"""
Read in an unzipped file and save compress it, adding .gz to the file name
ending, and returning that rename.
"""
import subprocess
if not is_zipped(fi):
# Unzip the file
subprocess.call(['gzip', fi])
# Add .gz to filename
fi = fi + '.gz'
else:
print('Warning: File is already zipped: \n{}'.format(fi))
return fi
def gzip_file(infile, outfile):
""" gzips a file """
import gzip
with open(infile, 'rb') as f_in:
with gzip.open(outfile, 'wb') as f_out:
f_out.writelines(f_in)
return outfile
def unzip_gz(infile, outfile):
""" Unzips a gzip file """
import gzip
import shutil
with gzip.open(infile, 'rb') as f_in:
with open(outfile, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
return outfile
def find_maybe_compressed_file(fi, gunzip=False, gzip=False):
"""
Given a filename and path for a file that may be gzipped, returns a filename
correcting for a .gz extension. Can unzip of zip the file if desired, in
which case the name will correspond to the desired condition of the file.
"""
from os.path import isfile
# Input name is correct
if isfile(fi):
outname = fi
# Input name is uncompressed, file is compressed
elif isfile(fi + '.gz'):
outname = fi + '.gz'
# Input name is compressed, file is uncompressed
elif isfile(fi.rstrip('.gz')):
outname = fi.rstrip('.gz')
# File wasn't found either way
else:
err_text = 'Cannot find this file (zipped or unzipped): \n{}'
raise ValueError(err_text.format(fi))
# Unzipping
if gunzip and ('.gz' in outname):
outname = unzip_file(outname)
# Zipping
if gzip and ('.gz' not in outname):
outname = zip_file(outname)
return outname
def copy_file(input_filename, output_filename, gunzip=False, gzip=False):
"""
Copies a file. Differs from default shutil copyfile in that there is a check
for whether the file is zipped, and options to zip or unzip it.
"""
from shutil import copyfile
# Get input file
input_filename = find_maybe_compressed_file(
input_filename, gunzip=gunzip, gzip=gzip)
# Make sure input and output zipped conditions match
if not is_zipped(input_filename) and is_zipped(output_filename):
output_filename = output_filename.rstrip('.gz')
if is_zipped(input_filename) and not is_zipped(output_filename):
output_filename = output_filename + '.gz'
copyfile(input_filename, output_filename)
return output_filename
def move_file(input_filename, output_filename, gunzip=False, gzip=False):
"""
Moves a file. Differs from default shutil move in that there is a check
for whether the file is zipped.
"""
from shutil import move
# Get input file
input_filename = find_maybe_compressed_file(
input_filename, gunzip=gunzip, gzip=gzip)
move(input_filename, output_filename)
return output_filename
def delete_file(file_name):
""" Deletes a file """
from os import remove
remove(find_maybe_compressed_file(file_name))
return
def out_directory(directory_name):
"""
Given an output directory string, checks whether that directory exists.
Creates it if it doesn't. If no name is given, returns an empty string.
"""
from os import makedirs
from os.path import isdir
if directory_name:
if not isdir(directory_name):
makedirs(directory_name)
return directory_name
else:
return ''
def output_file_name(filename, path='', extension='',
prefix=None, suffix=None, mkdir=True, dont_strip_gz=False):
"""
Given a filename, can add prefix, suffix, and/or path to the filename.
If the input file includes a path, that path will be retained by default
(an empty string) but can be set to a value or changed to None. The same
is true of the file extension. Detection of file extension locates the last
period (.), so that is where suffixes will be added, though .gz extensions
will be stripped off by default. Prefixes and suffixes will be appended
with underscores (_). If mkdir is True and the path does not exist, it
will be created, assuming that there is a path.
"""
from os.path import basename, dirname, join, splitext
# Remove .gz ending
if not dont_strip_gz:
filename = filename.rstrip('.gz')
# Get file path, basename, and extension
pathname = dirname(filename)
outname, ext = splitext(basename(filename))
# Overwrite path if given a user input, or set to default, or leave empty
if path:
pathname = path
elif isinstance(path, str) and not path:
pathname = pathname
else:
pathname = ''
# Create directory if it doesn't exist
if mkdir and pathname != '':
pathname = out_directory(pathname)
# Add prefix
if prefix != None:
outname = '_'.join([str(prefix), outname])
# Add suffix
if suffix != None:
outname = '_'.join([outname, str(suffix)])
# Overwrite extension if given a user input, or set to default, or leave
if extension:
outname = '.'.join([outname, extension])
elif isinstance(extension, str) and ext != '':
outname = '.'.join([outname, ext.lstrip('.')])
return join(pathname, outname)
################################################################################
# Fasta functions (many functions require BioPython)
def parse_fastafile(fasta_file):
"""
Read in a file with a list of fasta sequences and return a list of biopython
SeqRecord objects for all sequences
"""
from Bio import SeqIO
# Initialize list
fasta_list = []
# Populate list
for r in SeqIO.parse(fasta_file, 'fasta'):
fasta_list.append(r)
return fasta_list
def global_align(seq1, seq2, one_alignment_only=False, id_reward=1,
mismatch_penalty=-0.2, open_gap_penalty=-2, extend_gap_penalty=-0.1,
penalize_end_gaps=False, score_only=False):
"""
Performes a global alignment using Biopython's pairwise2 tool.
"""
from Bio import pairwise2
return pairwise2.align.globalms(
seq1, seq2, id_reward, mismatch_penalty, open_gap_penalty,
extend_gap_penalty, penalize_end_gaps=penalize_end_gaps,
score_only=score_only, one_alignment_only=one_alignment_only)
def generate_formatted_aligment(alignment, full=True):
"""
Input a biopython alignment to generate a formatted alignment display.
Full means that the full sequence will be included, not just the aligned
portion.
"""
from Bio.pairwise2 import format_alignment
# Generate alignment string
alignment_string = format_alignment(*alignment, full_sequences=full)
return alignment_string
def split_alignment_string(alignment_string, check_len=True):
"""
Strings output by biopython's format_alignment function are often more
convenient as a list than as a string. This function splits the string out
and then checks that all sequences are consistent in length. The first three
items in the list are respectively the query sequence, the identity between
sequences, and the subject sequence.
"""
# Split into sections
alignment_list = alignment_string.split('\n')
# Confirm that all sequences are consistent length
if check_len:
try:
assert len(alignment_list[0]) == len(alignment_list[1])
assert len(alignment_list[0]) == len(alignment_list[2])
except AssertionError:
d = {0: 'query', 1: 'identity', 2: 'subject'}
print('Sequences were not the same length')
for i in range(3):
print(d[i])
print(len(alignment_list[i]))
print(alignment_list[i])
print()
raise
return alignment_list
def calculate_alignment_length(alignment):
"""
Determines the length of alignment between two sequences, based on a
biopython sequence alignment.
"""
# Formatting alignment to create a demarcation string
alignment_string = generate_formatted_aligment(alignment)
# Split alignment string into sections representing different sequences
# The demarcation string is the second element of the list
a_list = split_alignment_string(alignment_string)
# Determine length, excluding unaligned gaps
return len([i for i in a_list[1] if i != ' '])
def display_alignment(alignment, width=80):
"""
Input a biopython alignment to print out a formatted alignment display.
Setting width value will change the number of characters per line of