-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfind_chrom.py
178 lines (155 loc) · 4.43 KB
/
find_chrom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python
# encoding: utf-8
"""
File: find_chrom.py
Author: Jessica McLaughlin
Last update: 4 October 2016
Info: Given BLASTn output (in .xml format), find the loci mapped to a given chromosome
"""
import os
import sys
import argparse
import re
import vcf
from Bio.Blast import NCBIXML
from phyluce.helpers import FullPaths, is_file, is_dir
from phyluce.log import setup_logging
def get_args():
"""Get arguments from user"""
parser = argparse.ArgumentParser(
description="Given BLASTn output (in .xml format), find the loci mapped to a given chromosome"
)
parser.add_argument(
'--input_xml',
required=True,
action=FullPaths,
type=is_file,
help='the input BLASTn xml files'
)
parser.add_argument(
'--input_vcf',
required=True,
action=FullPaths,
type=is_file,
help='the input BLASTn xml files'
)
parser.add_argument(
'--chromosome',
required=True,
type=str,
help='the chromosome of interest to filter by'
)
parser.add_argument(
'--log-path',
action=FullPaths,
type=is_dir,
default=None,
help='Directory in which to store log file'
)
parser.add_argument(
'--output_vcf',
required=True,
action=FullPaths,
type=is_file,
help='the input BLASTn xml files'
)
parser.add_argument(
"--verbosity",
type=str,
choices=["INFO", "WARN", "CRITICAL"],
default="INFO",
help="""The logging level to use."""
)
return parser.parse_args()
def get_match_records(log,input,chromosome): # parse the blast_records and create dict of matched/unmatched uces
result_handle = open(input)
blast_records = NCBIXML.parse(result_handle)
chrom = 'chromosome '+chromosome
matched_records = []
match_count = 0
for blast_record in blast_records:
for alignment in blast_record.alignments:
for hsps in alignment.hsps:
if chrom in alignment.title:
print blast_record.query
matched_records.append(blast_record.query) # store matching chromosomes w/ key
match_count+=1
log.info(blast_record.query+" is a match!")
else:
pass
log.info(blast_record.query+" is NOT a match!")
log.info(str(match_count)+" records for "+ chrom)
return matched_records
def fix_labels(loci_list,log): # fix labels to match vcf NOTE: SPECIFIC LABELS USED
log.info("fixing labels.....")
new_labels = []
for locus in loci_list:
nospace_locus = re.sub(r'\s?','',locus)
#print nospace_locus
short_locus = re.sub(r'\|uce-?[0123456789]{1,4}','',nospace_locus)
print short_locus
new_labels.append(short_locus)
return new_labels
#def get_names(vcf_reader, loci_list):
vcf_loci = []
# for record in vcf_reader:
# vcf_loci.append(record.CHROM)
# newvcf_loci = list(set(vcf_loci)-set(loci_list))
# for locus in vcf_loci:
# for name in loci_list:
# if locus == name: #and name in vcf_loci:
# if locus not in vcf_loci:
# pass
# else:
# vcf_loci.remove(locus)
# vcf_loci.remove(locus)
# print "YAY!!!!"
# else:
# print "FML"
# return newvcf_loci
def create_vcf(log,input_vcf,output_vcf,use_loci):
try:
reader = vcf.Reader(open(input_vcf, 'r'))
log.info("Reading vcf....")
except:
log.critical("Could not open vcf file")
raise IOError("vcf file "+input_vcf+" could not be opened")
try:
writer = vcf.Writer(open(output_vcf, 'w'), reader)
log.info('Output vcf created')
except:
log.critical('Could not create output vcf file')
raise IOError('Could not create output vcf file')
# names = get_names(reader,use_loci) # get inverse loci list
vcf_loci = []
for record in reader:
vcf_loci.append(record.CHROM)
newvcf_loci = list(set(vcf_loci)-set(use_loci))
try:
reader = vcf.Reader(open(input_vcf, 'r'))
log.info("Reading vcf....")
except:
log.critical("Could not open vcf file")
raise IOError("vcf file "+input_vcf+" could not be opened")
locus_count = 0
for record in reader: # for each record
for locus in newvcf_loci:
if record.CHROM in locus: # write record to new vcf if not on chrom to be excluded
writer.write_record(record)
locus_count+=1
else:
pass
log.info("Printed "+str(locus_count)+" loci to vcf.")
return output_vcf
def main():
# get args
args = get_args()
# setup logging
log, my_name = setup_logging(args)
log.info("Logging set-up")
matches = get_match_records(log,args.input_xml,args.chromosome)
matches_names = fix_labels(matches, log)
vcf.output = create_vcf(log,args.input_vcf,args.output_vcf,matches_names)
log.info("Done!")
if __name__ == '__main__':
main()