forked from PaddlePaddle/PaddleHub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ahocorasick.py
147 lines (112 loc) · 4.31 KB
/
ahocorasick.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""
本模块实现AC自动机封装为Ahocorasick类,用于进行词典的多模匹配。
"""
class Node(object):
"""AC自动机的树结点.
Attributes:
next: dict类型,指向子结点
fail: Node类型,AC自动机的fail指针
length: int类型,判断节点是否为单词
"""
__slots__ = ['next', 'fail', 'length']
def __init__(self):
"""初始化空节点."""
self.next = {}
self.fail = None # fail指针默认为None
self.length = -1
class Ahocorasick(object):
"""实现AC自动机的类
Attributes:
__root: Node类型,AC自动机根节点
"""
def __init__(self):
"""初始化Ahocorasick的根节点__root"""
self.__root = Node()
def add_word(self, word):
"""添加单词word到Trie树中"""
current = self.__root
for char in word:
current = current.next.setdefault(char, Node())
current.length = len(word)
def make(self):
"""构建fail指针路径"""
queue = list()
for key in self.__root.next:
self.__root.next[key].fail = self.__root
queue.append(self.__root.next[key])
# 广度优先算法遍历设置fail指针
while len(queue) > 0:
# 基于当前节点的fail指针设置其子结点的fail指针
current = queue.pop(0)
for k in current.next:
current_fail = current.fail
# 若当前节点有fail指针,尝试设置其子结点的fail指针
while current_fail is not None:
if k in current_fail.next:
current.next[k].fail = current_fail.next[k]
break
current_fail = current_fail.fail
# 若当前节点的fail指针不存在该子结点,令子结点fail指向根节点
if current_fail is None:
current.next[k].fail = self.__root
queue.append(current.next[k])
def search(self, content):
"""后向最大匹配.
对content的文本进行多模匹配,返回后向最大匹配的结果.
Args:
content: string类型, 用于多模匹配的字符串
Returns:
list类型, 最大匹配单词列表,每个元素为匹配的模式串在句中的起止位置,比如:
[(0, 2), [4, 7]]
"""
result = []
p = self.__root
for current_position in range(len(content)):
word = content[current_position]
#
while word not in p.next:
if p == self.__root:
break
p = p.fail
else:
p = p.next[word]
if p.length > 0:
result.append((current_position - p.length + 1, current_position))
return result
def search_all(self, content):
"""多模匹配的完全匹配.
对content的文本进行多模匹配,返回所有匹配结果
Args:
content: string类型, 用于多模匹配的字符串
Returns:
list类型, 所有匹配单词列表,每个元素为匹配的模式串在句中的起止位置,比如:
[(0, 2), [4, 7]]
"""
result = []
p = self.__root
for current_position in range(len(content)):
word = content[current_position]
while word not in p.next:
if p == self.__root:
break
p = p.fail
else:
p = p.next[word]
# 回溯查看是否存在以当前字符结尾的单词
tmp = p
while tmp != self.__root:
if tmp.length > 0:
result.append((current_position - tmp.length + 1, current_position))
tmp = tmp.fail
return result
if __name__ == '__main__':
ah = Ahocorasick()
x = ["百度", "家", "高科技", "科技", "科技公司"]
for i in x:
ah.add_word(i)
ah.make()
string = '百度是家高科技公司'
for begin, end in ah.search_all(string):
print('all:', string[begin:end + 1])
for begin, end in ah.search(string):
print('search:', string[begin:end + 1])