-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfrequent_words_mismatch_thread.py
61 lines (56 loc) · 1.68 KB
/
frequent_words_mismatch_thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import itertools, time
from multiprocessing import Process
cores = 8
def hamming_distance_problem(p, q):
ret = 0
for i in range(len(p)):
if p[i] != q[i]: ret += 1
return ret
def approx_pattern_matching_count(text, pattern, distance):
ret = 0
for i in range(len(text) - len(pattern) + 1):
if hamming_distance_problem(pattern, text[i:i + len(pattern)]) <= distance: ret += 1
return ret
def neighbors(pattern, distance):
if 0 == distance:
return [pattern]
if 1 == len(pattern):
return list('ACGT')
neighborhood = []
suffix_neighbors = neighbors(pattern[1:], distance)
for item in suffix_neighbors:
if hamming_distance_problem(item, pattern[1:]) < distance:
for ch in list('ACGT'):
neighborhood.append(ch + item)
else:
neighborhood.append(pattern[0] + item)
return list(set(neighborhood))
def y(text, patterns, distance):
ret = [0]
for pattern in patterns:
number = approx_pattern_matching_count(text, pattern, distance)
if number == ret[0]:
ret.append(pattern)
elif number > ret[0]:
ret[0:2] = [number,pattern]
ret[2:] = []
print ret[0], ' '.join(ret[1:])
f = open('frequent_words_mismatch.txt','r')
content = f.read()
lines = content.split('\n')
text = lines[0]
params = lines[1].split(' ')
kmer = int(params[0])
distance = int(params[1])
fullset = []
#for item in itertools.product('ACGT', repeat = kmer):
# fullset.append(''.join(item))
for i in range(len(text) - kmer + 1):
fullset.extend(neighbors(text[i:i + kmer], distance))
fullset = list(set(fullset))
wholerange = [fullset[i::cores] for i in range(cores)]
for core in range(cores):
if __name__ == '__main__':
p = Process(target=y, args=(text, wholerange[core], distance,))
p.start()
f.close()