-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathimputation_networks.py
executable file
·114 lines (98 loc) · 3.64 KB
/
imputation_networks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from torch import nn
from torch.optim import Adam
from mask_generators import MCARGenerator
from nn_utils import ResBlock, MemoryLayer, SkipConnection
from prob_utils import CategoricalToOneHotLayer, GaussianCategoricalLoss, \
GaussianCategoricalSampler, SetGaussianSigmasToOne
def get_imputation_networks(one_hot_max_sizes):
"""
This function builds neural networks for imputation given
the list of one-hot max sizes of the dataset features.
It returns a dictionary with those neural networks together with
reconstruction log probability function, optimizer constructor,
sampler from the generator output, mask generator, batch size,
and scale factor for the stability of the variational lower bound
optimization.
"""
width = 256
depth = 10
latent_dim = 64
# Proposal network
proposal_layers = [
CategoricalToOneHotLayer(one_hot_max_sizes +
[0] * len(one_hot_max_sizes),
list(range(len(one_hot_max_sizes)))),
nn.Linear(sum(max(1, x) for x in one_hot_max_sizes) +
len(one_hot_max_sizes) * 2,
width),
nn.LeakyReLU(),
]
for i in range(depth):
proposal_layers.append(
SkipConnection(
nn.Linear(width, width),
nn.LeakyReLU(),
)
)
proposal_layers.append(
nn.Linear(width, latent_dim * 2)
)
proposal_network = nn.Sequential(*proposal_layers)
# Prior network
prior_layers = [
CategoricalToOneHotLayer(one_hot_max_sizes +
[0] * len(one_hot_max_sizes)),
MemoryLayer('#input'),
nn.Linear(sum(max(1, x) for x in one_hot_max_sizes) +
len(one_hot_max_sizes),
width),
nn.LeakyReLU(),
]
for i in range(depth):
prior_layers.append(
SkipConnection(
# skip-connection from prior network to generative network
MemoryLayer('#%d' % i),
nn.Linear(width, width),
nn.LeakyReLU(),
)
)
prior_layers.extend([
MemoryLayer('#%d' % depth),
nn.Linear(width, latent_dim * 2),
])
prior_network = nn.Sequential(*prior_layers)
# Generative network
generative_layers = [
nn.Linear(64, 256),
nn.LeakyReLU(),
]
for i in range(depth + 1):
generative_layers.append(
SkipConnection(
# skip-connection from prior network to generative network
MemoryLayer('#%d' % (depth - i), True),
nn.Linear(width * 2, width),
nn.LeakyReLU(),
)
)
generative_layers.extend([
MemoryLayer('#input', True),
nn.Linear(width + sum(max(1, x) for x in one_hot_max_sizes) +
len(one_hot_max_sizes),
sum(max(2, x) for x in one_hot_max_sizes)),
SetGaussianSigmasToOne(one_hot_max_sizes),
])
generative_network = nn.Sequential(*generative_layers)
return {
'batch_size': 64,
'reconstruction_log_prob': GaussianCategoricalLoss(one_hot_max_sizes),
'sampler': GaussianCategoricalSampler(one_hot_max_sizes,
sample_most_probable=True),
'vlb_scale_factor': 1 / len(one_hot_max_sizes),
'optimizer': lambda parameters: Adam(parameters, lr=3e-4),
'mask_generator': MCARGenerator(0.2),
'proposal_network': proposal_network,
'prior_network': prior_network,
'generative_network': generative_network,
}