-
Notifications
You must be signed in to change notification settings - Fork 83
/
Copy pathmodel.py
132 lines (108 loc) · 5.1 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from embed_regularize import embedded_dropout
from locked_dropout import LockedDropout
from weight_drop import WeightDrop
class RNNModel(nn.Module):
"""Container module with an encoder, a recurrent module, and a decoder."""
def __init__(self, rnn_type, ntoken, ninp, nhid, nhidlast, nlayers,
dropout=0.5, dropouth=0.5, dropouti=0.5, dropoute=0.1, wdrop=0,
tie_weights=False, ldropout=0.5, n_experts=10):
super(RNNModel, self).__init__()
self.use_dropout = True
self.lockdrop = LockedDropout()
self.encoder = nn.Embedding(ntoken, ninp)
self.rnns = [torch.nn.LSTM(ninp if l == 0 else nhid, nhid if l != nlayers - 1 else nhidlast, 1, dropout=0) for l in range(nlayers)]
if wdrop:
self.rnns = [WeightDrop(rnn, ['weight_hh_l0'], dropout=wdrop if self.use_dropout else 0) for rnn in self.rnns]
self.rnns = torch.nn.ModuleList(self.rnns)
self.prior = nn.Linear(nhidlast, n_experts, bias=False)
self.latent = nn.Sequential(nn.Linear(nhidlast, n_experts*ninp), nn.Tanh())
self.decoder = nn.Linear(ninp, ntoken)
# Optionally tie weights as in:
# "Using the Output Embedding to Improve Language Models" (Press & Wolf 2016)
# https://arxiv.org/abs/1608.05859
# and
# "Tying Word Vectors and Word Classifiers: A Loss Framework for Language Modeling" (Inan et al. 2016)
# https://arxiv.org/abs/1611.01462
if tie_weights:
#if nhid != ninp:
# raise ValueError('When using the tied flag, nhid must be equal to emsize')
self.decoder.weight = self.encoder.weight
self.init_weights()
self.rnn_type = rnn_type
self.ninp = ninp
self.nhid = nhid
self.nhidlast = nhidlast
self.nlayers = nlayers
self.dropout = dropout
self.dropouti = dropouti
self.dropouth = dropouth
self.dropoute = dropoute
self.ldropout = ldropout
self.dropoutl = ldropout
self.n_experts = n_experts
self.ntoken = ntoken
size = 0
for p in self.parameters():
size += p.nelement()
print('param size: {}'.format(size))
def init_weights(self):
initrange = 0.1
self.encoder.weight.data.uniform_(-initrange, initrange)
self.decoder.bias.data.fill_(0)
self.decoder.weight.data.uniform_(-initrange, initrange)
def forward(self, input, hidden, return_h=False, return_prob=False):
batch_size = input.size(1)
emb = embedded_dropout(self.encoder, input, dropout=self.dropoute if (self.training and self.use_dropout) else 0)
#emb = self.idrop(emb)
emb = self.lockdrop(emb, self.dropouti if self.use_dropout else 0)
raw_output = emb
new_hidden = []
#raw_output, hidden = self.rnn(emb, hidden)
raw_outputs = []
outputs = []
for l, rnn in enumerate(self.rnns):
current_input = raw_output
raw_output, new_h = rnn(raw_output, hidden[l])
new_hidden.append(new_h)
raw_outputs.append(raw_output)
if l != self.nlayers - 1:
#self.hdrop(raw_output)
raw_output = self.lockdrop(raw_output, self.dropouth if self.use_dropout else 0)
outputs.append(raw_output)
hidden = new_hidden
output = self.lockdrop(raw_output, self.dropout if self.use_dropout else 0)
outputs.append(output)
latent = self.latent(output)
latent = self.lockdrop(latent, self.dropoutl if self.use_dropout else 0)
logit = self.decoder(latent.view(-1, self.ninp))
prior_logit = self.prior(output).contiguous().view(-1, self.n_experts)
prior = nn.functional.softmax(prior_logit, -1)
prob = nn.functional.softmax(logit.view(-1, self.ntoken), -1).view(-1, self.n_experts, self.ntoken)
prob = (prob * prior.unsqueeze(2).expand_as(prob)).sum(1)
if return_prob:
model_output = prob
else:
log_prob = torch.log(prob.add_(1e-8))
model_output = log_prob
model_output = model_output.view(-1, batch_size, self.ntoken)
if return_h:
return model_output, hidden, raw_outputs, outputs
return model_output, hidden
def init_hidden(self, bsz):
weight = next(self.parameters()).data
return [(Variable(weight.new(1, bsz, self.nhid if l != self.nlayers - 1 else self.nhidlast).zero_()),
Variable(weight.new(1, bsz, self.nhid if l != self.nlayers - 1 else self.nhidlast).zero_()))
for l in range(self.nlayers)]
if __name__ == '__main__':
model = RNNModel('LSTM', 10, 12, 12, 12, 2)
input = Variable(torch.LongTensor(13, 9).random_(0, 10))
hidden = model.init_hidden(9)
model(input, hidden)
# input = Variable(torch.LongTensor(13, 9).random_(0, 10))
# hidden = model.init_hidden(9)
# print(model.sample(input, hidden, 5, 6, 1, 2, sample_latent=True).size())