forked from broadinstitute/dockstore-tool-cms2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hapset_to_vcf.py
executable file
·62 lines (52 loc) · 3 KB
/
hapset_to_vcf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#!/usr/bin/env python3
"""Convert .tped files to .vcf ."""
import contextlib
import itertools
import operator
import os
import os.path
import sys
import misc_utils
def hapset_to_vcf(hapset_manifest_json_fname, out_vcf_basename, sel_pop):
"""Convert a hapset to an indexed .vcf.gz"""
hapset_dir = os.path.dirname(hapset_manifest_json_fname)
hapset_manifest = misc_utils.json_loadf(hapset_manifest_json_fname)
pops = hapset_manifest['popIds']
with open(out_vcf_basename + '.vcf', 'w') as out_vcf:
out_vcf.write('##fileformat=VCFv4.2\n')
out_vcf.write('##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">\n')
vcf_cols = ['#CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT']
with open(out_vcf_basename + '.case.txt', 'w') as out_case, \
open(out_vcf_basename + '.cont.txt', 'w') as out_cont:
for pop in pops:
for hap_num_in_pop in range(hapset_manifest['pop_sample_sizes'][pop]):
sample_id = f'{pop}_{hap_num_in_pop}'
vcf_cols.append(sample_id)
(out_case if pop == sel_pop else out_cont).write(f'{pop}\t{sample_id}\n')
out_vcf.write('\t'.join(vcf_cols) + '\n')
with contextlib.ExitStack() as exit_stack:
tped_fnames = [os.path.join(hapset_dir, hapset_manifest['tpeds'][pop])
for pop in hapset_manifest['popIds']]
tpeds = [exit_stack.enter_context(open(tped_fname)) for tped_fname in tped_fnames]
def make_tuple(*args): return tuple(args)
tped_lines_tuples = map(make_tuple, *map(iter, tpeds))
for tped_lines_tuple in tped_lines_tuples:
# make ref allele be A for ancestral and then D for derived?
tped_lines_fields = [line.strip().split() for line in tped_lines_tuple]
misc_utils.chk(len(set(map(operator.itemgetter(3), tped_lines_fields))) == 1,
'all tpeds in hapset must be for same pos')
vcf_fields = ['1', tped_lines_fields[0][3], '.', 'A', 'D', '.', '.', '.', 'GT']
for pop, tped_line_fields_list in zip(pops, tped_lines_fields):
for hap_num_in_pop in range(hapset_manifest['pop_sample_sizes'][pop]):
vcf_fields.append('0' if tped_line_fields_list[4 + hap_num_in_pop] == '1' \
else '1')
out_vcf.write('\t'.join(vcf_fields) + '\n')
# end: for tped_lines_tuple in tped_lines_tuples
# end: with contextlib.ExitStack() as exit_stack
# end: with open(out_vcf_fname, 'w') as out_vcf
misc_utils.execute(f'bgzip -f -i {out_vcf_basename}.vcf')
misc_utils.execute(f'bcftools index {out_vcf_basename}.vcf.gz')
# end: def hapset_to_vcf(hapset_manifest_json_fname, out_vcf_fname)
if __name__ == '__main__':
print(misc_utils.available_cpu_count())
hapset_to_vcf(sys.argv[1], sys.argv[2], sys.argv[3])