-
Notifications
You must be signed in to change notification settings - Fork 0
/
mvn.py
115 lines (88 loc) · 4.19 KB
/
mvn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from __future__ import print_function
import torch
import numpy as np
from torch.autograd import Function
"""
Note:
1. Here is calculating the log distribution, not the original distribution
2. For now, not Support Batch: to resolve this problem, we should wrap a loop outside this function
3. the leverage of numpy shared memory here will not work in cuda tensor: not so nice solution is from the
beginning of entrance of the function, check if they are in GPU; transfer them to CPU without exception and
to see if we need to transfer them back to GPU according to a flag
4. convert input to double
"""
class MVNLoss(Function):
def __init__(self):
super(MVNLoss, self).__init__()
self.dim = None
self.input_minus_miu = None
self.inv_sing_mat = None
self.double_flag = None
def forward(self, miu_vec, cov_mat, input_vec):
"""
:param params: vector miu and unique elements of covariance matrix sigma
the first dim number of params is miu and then is sigma
:param input: ground truth samples from the training set
:param dim: the dimension of the distribution
:return: forward output
"""
try:
if isinstance(cov_mat, torch.cuda.DoubleTensor) and isinstance(input_vec, torch.cuda.DoubleTensor) or \
isinstance(cov_mat, torch.DoubleTensor) and isinstance(input_vec, torch.DoubleTensor):
self.double_flag = True
elif isinstance(cov_mat, torch.cuda.FloatTensor) and isinstance(input_vec, torch.cuda.FloatTensor) or \
isinstance(cov_mat, torch.FloatTensor) and isinstance(input_vec, torch.FloatTensor):
self.double_flag = False
else:
raise RuntimeError("params and input have different precisions")
except RuntimeError as err:
print(err.message)
miu_vec = miu_vec.double()
cov_mat = cov_mat.double()
input_vec = input_vec.double()
dim = cov_mat.size(0)
assert (input_vec.size(1)==dim), "dimension does not match"
input_minus_miu = input_vec - miu_vec# 1xdim vector
inv_sing_mat = torch.inverse(cov_mat)
self.dim = dim
self.input_minus_miu = input_minus_miu
self.inv_sing_mat = inv_sing_mat
det = np.linalg.det(cov_mat.cpu().numpy())
try:
if det<0:
raise ValueError("negative determinant of covariance matrix")
except ValueError as err:
print(err.message)
# print('{}\n'.format(det))
output = - dim/2.0 * np.log(2*np.pi) + (-1.0/2) * np.log(det) + \
(-1.0/2)*torch.mm(torch.mm(input_minus_miu, inv_sing_mat), input_minus_miu.t())
# the output is actually the log pdf
if not self.double_flag:
output = output.float()
return output
def backward(self, grad_output):
"""
Here exist two assumptions:
1. the covariance matrix we construct from the output of RNN is positive definite
2. the parameters we get from RNN through this operation is [1, ~], while the input is [dim, 1]
"""
grad_output = grad_output.double()
grad_output = grad_output[0, 0] # extract the scalar from PyTorch tensor
inv_sing_mat = self.inv_sing_mat
input_minus_miu = self.input_minus_miu
dim = self.dim
# gradients of covariance matrix
mid_result1 = torch.mm(input_minus_miu.t(), input_minus_miu)
grad_sigma1 = -torch.mm(torch.mm(inv_sing_mat, mid_result1), inv_sing_mat)
grad_sigma2 = inv_sing_mat
grad_sigma = -(1.0/2)*(grad_sigma1 + grad_sigma2)*grad_output
# grad_sigma = 2.0*grad_sigma - grad_sigma*torch.eye(dim).double()
# gradients of miu
grad_miu = -torch.mm(inv_sing_mat, -input_minus_miu.t())*grad_output
# gradients of input
grad_input = -torch.mm(inv_sing_mat, input_minus_miu.t())*grad_output
if not self.double_flag:
grad_sigma = grad_sigma.float()
grad_input = grad_input.float()
grad_miu = grad_miu.float()
return grad_miu, grad_sigma, grad_input