-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrie.pyx
203 lines (164 loc) · 5.65 KB
/
trie.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
cimport ctrie
from cpython cimport array
import array
from libcpp cimport bool
ctypedef int KeyT
ctypedef int DataT
ctypedef ctrie.Trie[KeyT, DataT] cTrie
ctypedef ctrie.TrieIter[KeyT, DataT] cIter
ctypedef ctrie.TrieIter1[KeyT, DataT] cIter1
ctypedef ctrie.TrieIter2[KeyT, DataT] cIter2
def trie():
return Trie().create()
cdef trie_warper(cTrie *p):
t = Trie()
t._c_trie = p
return t
cdef class Trie:
"""
A trie class. Usage: <python code>
import trie
t = trie.trie()
t.insert([1, 2, 3], 10.0)
t.insert([1, 2], 5.0)
print(t.find([1, 2, 3])
print(t.find([1, 2])
print([1, 2] in t)
"""
cdef cTrie *_c_trie
cdef bool _need_release
def __cinit__(self):
self._c_trie = NULL
self._need_release = 0
def __dealloc__(self):
if self._need_release:
del self._c_trie
def create(self):
if self._c_trie is NULL:
self._c_trie = new cTrie()
self._need_release = 1
else:
raise MemoryError('Tire has been inilized!')
return self
def clean(self):
"""clean the trie"""
self._c_trie.Clean()
def insert(self, key_list, value=None):
"""
insert a value.
If value is not None, then insert the value;
otherwise, just create a sub-trie
Return a sub trie
"""
cdef array.array keys = array.array('i', key_list)
cdef bool bFound = False
cdef cTrie *psub = self._c_trie.InsertTrie(keys.data.as_ints, len(key_list), bFound)
if value is not None:
psub.GetData()[0] = value
return trie_warper(psub)
def setdefault(self, key_list, value):
"""
Set the default value.
If the trie exits, this method return the sub-trie.
If not, this method insert the value and return the sub-trie
"""
cdef array.array keys = array.array('i', key_list)
cdef bool bFound = False
cdef cTrie *psub = self._c_trie.InsertTrie(keys.data.as_ints, len(key_list), bFound)
if not bFound:
psub.GetData()[0] = value
return trie_warper(psub)
def find(self, key_list):
cdef array.array keys = array.array('i', key_list)
cdef bool bFound = False
cdef DataT *pData = self._c_trie.Find(keys.data.as_ints, len(key_list), bFound)
if bFound == False:
raise IndexError('Connot find keys')
return pData[0]
def find_trie(self, key_list):
cdef array.array keys = array.array('i', key_list)
cdef bool bFound = False
cdef cTrie *psub = self._c_trie.FindTrie(keys.data.as_ints, len(key_list), bFound)
if bFound == False:
raise IndexError('Connot find keys')
return trie_warper(psub)
def __contains__(self, key_list):
cdef array.array keys = array.array('i', key_list)
cdef bool bFound = False
self._c_trie.Find(keys.data.as_ints, len(key_list), bFound)
return bFound
@property
def data(self):
return self._c_trie.GetData()[0]
@data.setter
def data(self, value):
self._c_trie.GetData()[0] = value
cdef class SubIter:
"""
traverse the the child of current node
"""
cdef cIter *_c_iter
def __cinit__(self, Trie t, bool is_sorted=False):
if is_sorted:
self._c_iter = new cIter(t._c_trie, ctrie.LHash_IncSort[KeyT])
else:
self._c_iter = new cIter(t._c_trie, NULL)
def __dealloc__(self):
del self._c_iter
def init(self):
self._c_iter.Init()
def __next__(self):
cdef KeyT key = 0
cdef cTrie *p = self._c_iter.Next(key)
if p == NULL:
raise StopIteration
return key, p.GetData()[0]
def __iter__(self):
return self
cdef class TrieIter:
"""
traverse all the node in the whole trie
"""
cdef cIter1 *_c_iter
cdef array.array _context
def __cinit__(self, Trie t, bool is_sorted=False, int max_key_len = 100):
self._context = array.array('i', [0]*max_key_len)
if is_sorted:
self._c_iter = new cIter1(t._c_trie, self._context.data.as_ints, 1, ctrie.LHash_IncSort[KeyT])
else:
self._c_iter = new cIter1(t._c_trie, self._context.data.as_ints, 1, NULL)
def __dealloc__(self):
del self._c_iter
def init(self):
self._c_iter.Init()
def __next__(self):
cdef int n = 0
cdef cTrie *p = self._c_iter.Next(n)
if p == NULL:
raise StopIteration
return self._context[0:n].tolist(), p.GetData()[0]
def __iter__(self):
return self
cdef class LevelIter:
"""
traverse all the node at a fix level
"""
cdef cIter2 *_c_iter
cdef array.array _context
def __cinit__(self, Trie t, int level, bool is_sorted=False):
self._context = array.array('i', [0]*level)
if is_sorted:
self._c_iter = new cIter2(t._c_trie, self._context.data.as_ints, level, ctrie.LHash_IncSort[KeyT])
else:
self._c_iter = new cIter2(t._c_trie, self._context.data.as_ints, level, NULL)
def __dealloc__(self):
del self._c_iter
def init(self):
self._c_iter.Init()
def __next__(self):
cdef cTrie *p = self._c_iter.Next()
if p == NULL:
raise StopIteration
return self._context.tolist(), p.GetData()[0]
def __iter__(self):
return self