-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_utils.py
337 lines (277 loc) · 10.9 KB
/
data_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# -*- coding: utf-8 -*-
import os
import pickle
import numpy as np
Triplet_End = '</s>'
Inner_Interval = '<d>'
Sentence_Start = '<e>'
Sentence_End = '</e>'
# This has a vocab id, which is used to pad the encoder input, decoder input and target sequence
PAD_TOKEN = '<pad>'
# This has a vocab id, which is used to represent out-of-vocabulary words
UNK_TOKEN = '<unk>'
# # This has a vocab id, which is used at the start of every decoder input sequence
# BOS_TOKEN = '<bos>'
# # This has a vocab id, which is used at the end of untruncated target sequences
# EOS_TOKEN = '<eos>'
# This has a vocab id, which is used at the start of every decoder input sequence
START_DECODING = '<start>'
# This has a vocab id, which is used at the end of untruncated target sequences
STOP_DECODING = '<end>'
def load_word_vec(path, word2idx=None, embed_dim=300):
fin = open(path, 'r', encoding='utf-8', newline='\n', errors='ignore')
word_vec = {}
for line in fin:
tokens = line.rstrip().split()
word, vec = ' '.join(tokens[:-embed_dim]), tokens[-embed_dim:]
if word in word2idx.keys():
word_vec[word] = np.asarray(vec, dtype='float32')
return word_vec
def build_embedding_matrix(data_dir, word2idx, embed_dim, type):
embedding_matrix_file_name = '{0}_{1}_embedding_matrix.pkl'.format(
str(embed_dim), type)
if os.path.exists(os.path.join(data_dir, embedding_matrix_file_name)):
print('>>> loading embedding matrix:', embedding_matrix_file_name)
embedding_matrix = pickle.load(
open(os.path.join(data_dir, embedding_matrix_file_name), 'rb'))
else:
print('>>> loading word vectors ...')
# words not found in embedding index will be randomly initialized.
embedding_matrix = np.random.uniform(-1/np.sqrt(
embed_dim), 1/np.sqrt(embed_dim), (len(word2idx), embed_dim))
# <pad>
embedding_matrix[0, :] = np.zeros((1, embed_dim))
fname = './glove/glove.840B.300d.txt'
word_vec = load_word_vec(fname, word2idx=word2idx, embed_dim=embed_dim)
print('>>> building embedding matrix:', embedding_matrix_file_name)
for word, i in word2idx.items():
vec = word_vec.get(word)
if vec is not None:
embedding_matrix[i] = vec
pickle.dump(embedding_matrix, open(os.path.join(
data_dir, embedding_matrix_file_name), 'wb'))
return embedding_matrix
class Tokenizer(object):
def __init__(self, word2idx=None):
if word2idx is None:
self.word2idx = {}
self.idx2word = {}
self.idx = 0
self.word2idx['<pad>'] = self.idx
self.idx2word[self.idx] = '<pad>'
self.idx += 1
self.word2idx['<unk>'] = self.idx
self.idx2word[self.idx] = '<unk>'
self.idx += 1
self.word2idx['<start>'] = self.idx
self.idx2word[self.idx] = '<start>'
self.idx += 1
self.word2idx['<end>'] = self.idx
self.idx2word[self.idx] = '<end>'
self.idx += 1
# self.word2idx['<bos>'] = self.idx
# self.idx2word[self.idx] = '<bos>'
# self.idx += 1
# self.word2idx['<eos>'] = self.idx
# self.idx2word[self.idx] = '<eos>'
# self.idx += 1
self.word2idx['<e>'] = self.idx
self.idx2word[self.idx] = '<e>'
self.idx += 1
self.word2idx['</e>'] = self.idx
self.idx2word[self.idx] = '</e>'
self.idx += 1
self.word2idx['<d>'] = self.idx
self.idx2word[self.idx] = '<d>'
self.idx += 1
self.word2idx['</s>'] = self.idx
self.idx2word[self.idx] = '</s>'
self.idx += 1
self.word2idx['positive'] = self.idx
self.idx2word[self.idx] = 'positive'
self.idx += 1
self.word2idx['negative'] = self.idx
self.idx2word[self.idx] = 'negative'
self.idx += 1
self.word2idx['neutral'] = self.idx
self.idx2word[self.idx] = 'neutral'
self.idx += 1
else:
self.word2idx = word2idx
self.idx2word = {v: k for k, v in word2idx.items()}
def fit_on_text(self, text):
text = text.lower()
words = text.split()
for word in words:
if word not in self.word2idx:
self.word2idx[word] = self.idx
self.idx2word[self.idx] = word
self.idx += 1
def text_to_sequence(self, text):
text = text.lower()
words = text.split()
unknownidx = self.unk()
sequence = [self.word2idx[w]
if w in self.word2idx else unknownidx for w in words]
if len(sequence) == 0:
sequence = [0]
return sequence
def __len__(self):
'''Returns size of the Vocabulary.'''
return len(self.word2idx)
def word2id(self, word):
'''Return thr id (integer) of a word (string). Return id if word is OOV.'''
unk_id = self.unk()
return self.word2idx.get(word, unk_id)
def id2word(self, word_id):
'''Return the word string corresponding to an id (integer).'''
if word_id not in self.idx2word:
raise ValueError(f'Id not found in vocab:{word_id}')
return self.idx2word[word_id]
def size(self):
'''Return the total size of the vocabulary.'''
return len(self.word2idx)
def start(self):
return self.word2id(START_DECODING)
def stop(self):
return self.word2id(STOP_DECODING)
def pad(self):
return self.word2idx[PAD_TOKEN]
def unk(self):
return self.word2idx[UNK_TOKEN]
def eos(self):
return self.word2idx[EOS_TOKEN]
def bos(self):
return self.word2idx[BOS_TOKEN]
def sentence_start(self):
return self.word2idx[Sentence_Start]
def sentence_end(self):
return self.word2id[Sentence_End]
def triplet_end(self):
return self.word2idx[Triplet_End]
def inner_Interval(self):
return self.word2idx[Inner_Interval]
def extend(self, oovs):
extended_vocab = self.idx2word+list(oovs)
return extended_vocab
def tokens2ids(self, tokens):
ids = [self.word2id(t) for t in tokens]
return ids
def source2ids_ext(self, src_tokens):
"""Maps source tokens to ids if in vocab, extended vocab ids if oov.
Args:
src_tokens: list of source text tokens
Returns:
ids: list of source text token ids
oovs: list of oovs in source text
"""
ids = []
oovs = []
for t in src_tokens:
t_id = self.word2id(t)
unk_id = self.word2idx[UNK_TOKEN]
if t_id == unk_id:
if t not in oovs:
oovs.append(t)
ids.append(self.size()+oovs.index(t))
else:
ids.append(t_id)
return ids, oovs
def target2ids_ext(self, tgt_tokens, oovs):
"""Maps target text to ids, using extended vocab (vocab + oovs).
Args:
tgt_tokens: list of target text tokens
oovs: list of oovs from source text (copy mechanism)
Returns:
ids: list of target text token ids
"""
ids = []
for t in tgt_tokens:
t_id = self.word2id(t)
unk_id = self.word2idx[UNK_TOKEN]
if t_id == unk_id:
if t in oovs:
ids.append(self.size() + oovs.index(t))
else:
ids.append(unk_id)
else:
ids.append(t_id)
return ids
def outputids2words(self, ids, src_oovs):
"""Maps output ids to words
Args:
ids: list of ids
src_oovs: list of oov words
Returns:
words: list of words mapped from ids
"""
words = []
extended_vocab = self.extend(src_oovs)
for i in ids:
try:
w = self.idx2word(i) # might be oov
except ValueError as e:
assert src_oovs is not None, "Error: model produced a word ID that isn't in the vocabulary."
try:
w = extended_vocab[i]
except IndexError as e:
raise ValueError(f'Error: model produced word ID {i} \
but this example only has {len(src_oovs)} article OOVs')
words.append(w)
return words
def build_tokenizer(data_dir):
# if os.path.exists(os.path.join(data_dir, 'word2idx.pkl')):
# print('>>> loading {0} tokenizer...'.format(data_dir))
# with open(os.path.join(data_dir, 'word2idx.pkl'), 'rb') as f:
# word2idx = pickle.load(f)
# tokenizer = Tokenizer(word2idx=word2idx)
# else:
filenames = [os.path.join(data_dir, '%s.txt' % set_type)
for set_type in ['train', 'dev', 'test']]
all_text = ''
for filename in filenames:
print("********************-------->>>>>>>", filename)
fp = open(filename, 'r', encoding='utf-8')
lines = fp.readlines()
fp.close()
for i in range(0, len(lines), 2):
text = lines[i].strip()
all_text += (text + ' ')
tokenizer = Tokenizer()
tokenizer.fit_on_text(all_text)
print('>>> saving {0} tokenizer...'.format(data_dir))
with open(os.path.join(data_dir, 'word2idx.pkl'), 'wb') as f:
pickle.dump(tokenizer.word2idx, f)
return tokenizer
class ASTEDataReader(object):
def __init__(self, data_dir):
self.polarity_map = {'neutral': 1,
'negtive': 2, 'positive': 3} # NO_RELATION is 0
self.reverse_polarity_map = {
v: k for k, v in self.polarity_map.items()}
self.data_dir = data_dir
def get_train(self, tokenizer):
return self._create_dataset('train', tokenizer)
def get_dev(self, tokenizer):
return self._create_dataset('dev', tokenizer)
def get_test(self, tokenizer):
return self._create_dataset('test', tokenizer)
def _create_dataset(self, set_type, tokenizer):
all_data = []
file_name = os.path.join(self.data_dir, '%s.txt' % set_type)
fp = open(file_name, 'r', encoding='utf-8')
lines = fp.readlines()
fp.close()
for i in range(0, len(lines), 2):
text = lines[i].strip().lower()
target = lines[i+1].strip().lower()
text_indices = tokenizer.text_to_sequence(text)
target_indices = tokenizer.text_to_sequence(target)
data = {
'text': text,
'target': target,
'text_indices': text_indices,
'target_indices': target_indices,
}
all_data.append(data)
return all_data