-
Notifications
You must be signed in to change notification settings - Fork 14
/
utils.py
415 lines (306 loc) · 14.1 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import torch
import numpy as np
from scipy import optimize
def nanmean(v, *args, **kwargs):
"""
A Pytorch version on Numpy's nanmean
"""
v = v.clone()
is_nan = torch.isnan(v)
v[is_nan] = 0
return v.sum(*args, **kwargs) / (~is_nan).float().sum(*args, **kwargs)
#### Quantile ######
def quantile(X, q, dim=None):
"""
Returns the q-th quantile.
Parameters
----------
X : torch.DoubleTensor or torch.cuda.DoubleTensor, shape (n, d)
Input data.
q : float
Quantile level (starting from lower values).
dim : int or None, default = None
Dimension allong which to compute quantiles. If None, the tensor is flattened and one value is returned.
Returns
-------
quantiles : torch.DoubleTensor
"""
return X.kthvalue(int(q * len(X)), dim=dim)[0]
#### Automatic selection of the regularization parameter ####
def pick_epsilon(X, quant=0.5, mult=0.05, max_points=2000):
"""
Returns a quantile (times a multiplier) of the halved pairwise squared distances in X.
Used to select a regularization parameter for Sinkhorn distances.
Parameters
----------
X : torch.DoubleTensor or torch.cuda.DoubleTensor, shape (n, d)
Input data on which distances will be computed.
quant : float, default = 0.5
Quantile to return (default is median).
mult : float, default = 0.05
Mutiplier to apply to the quantiles.
max_points : int, default = 2000
If the length of X is larger than max_points, estimate the quantile on a random subset of size max_points to
avoid memory overloads.
Returns
-------
epsilon: float
"""
means = nanmean(X, 0)
X_ = X.clone()
mask = torch.isnan(X_)
X_[mask] = (mask * means)[mask]
idx = np.random.choice(len(X_), min(max_points, len(X_)), replace=False)
X = X_[idx]
dists = ((X[:, None] - X) ** 2).sum(2).flatten() / 2.
dists = dists[dists > 0]
return quantile(dists, quant, 0).item() * mult
#### Accuracy Metrics ####
def MAE(X, X_true, mask):
"""
Mean Absolute Error (MAE) between imputed variables and ground truth. Pytorch/Numpy agnostic
Parameters
----------
X : torch.DoubleTensor or np.ndarray, shape (n, d)
Data with imputed variables.
X_true : torch.DoubleTensor or np.ndarray, shape (n, d)
Ground truth.
mask : torch.BoolTensor or np.ndarray of booleans, shape (n, d)
Missing value mask (missing if True)
Returns
-------
MAE : float
"""
if torch.is_tensor(mask):
mask_ = mask.bool()
return torch.abs(X[mask_] - X_true[mask_]).sum() / mask_.sum()
else: # should be an ndarray
mask_ = mask.astype(bool)
return np.absolute(X[mask_] - X_true[mask_]).sum() / mask_.sum()
def RMSE(X, X_true, mask):
"""
Root Mean Squared Error (MAE) between imputed variables and ground truth. Pytorch/Numpy agnostic
Parameters
----------
X : torch.DoubleTensor or np.ndarray, shape (n, d)
Data with imputed variables.
X_true : torch.DoubleTensor or np.ndarray, shape (n, d)
Ground truth.
mask : torch.BoolTensor or np.ndarray of booleans, shape (n, d)
Missing value mask (missing if True)
Returns
-------
RMSE : float
"""
if torch.is_tensor(mask):
mask_ = mask.bool()
return (((X[mask_] - X_true[mask_]) ** 2).sum() / mask_.sum()).sqrt()
else: # should be an ndarray
mask_ = mask.astype(bool)
return np.sqrt(((X[mask_] - X_true[mask_])**2).sum() / mask_.sum())
##################### MISSING DATA MECHANISMS #############################
##### Missing At Random ######
def MAR_mask(X, p, p_obs):
"""
Missing at random mechanism with a logistic masking model. First, a subset of variables with *no* missing values is
randomly selected. The remaining variables have missing values according to a logistic model with random weights,
re-scaled so as to attain the desired proportion of missing values on those variables.
Parameters
----------
X : torch.DoubleTensor or np.ndarray, shape (n, d)
Data for which missing values will be simulated. If a numpy array is provided,
it will be converted to a pytorch tensor.
p : float
Proportion of missing values to generate for variables which will have missing values.
p_obs : float
Proportion of variables with *no* missing values that will be used for the logistic masking model.
Returns
-------
mask : torch.BoolTensor or np.ndarray (depending on type of X)
Mask of generated missing values (True if the value is missing).
"""
n, d = X.shape
to_torch = torch.is_tensor(X) ## output a pytorch tensor, or a numpy array
if not to_torch:
X = torch.from_numpy(X)
mask = torch.zeros(n, d).bool() if to_torch else np.zeros((n, d)).astype(bool)
d_obs = max(int(p_obs * d), 1) ## number of variables that will have no missing values (at least one variable)
d_na = d - d_obs ## number of variables that will have missing values
### Sample variables that will all be observed, and those with missing values:
idxs_obs = np.random.choice(d, d_obs, replace=False)
idxs_nas = np.array([i for i in range(d) if i not in idxs_obs])
### Other variables will have NA proportions that depend on those observed variables, through a logistic model
### The parameters of this logistic model are random.
### Pick coefficients so that W^Tx has unit variance (avoids shrinking)
coeffs = pick_coeffs(X, idxs_obs, idxs_nas)
### Pick the intercepts to have a desired amount of missing values
intercepts = fit_intercepts(X[:, idxs_obs], coeffs, p)
ps = torch.sigmoid(X[:, idxs_obs].mm(coeffs) + intercepts)
ber = torch.rand(n, d_na)
mask[:, idxs_nas] = ber < ps
return mask
##### Missing not at random ######
def MNAR_mask_logistic(X, p, p_params =.3, exclude_inputs=True):
"""
Missing not at random mechanism with a logistic masking model. It implements two mechanisms:
(i) Missing probabilities are selected with a logistic model, taking all variables as inputs. Hence, values that are
inputs can also be missing.
(ii) Variables are split into a set of intputs for a logistic model, and a set whose missing probabilities are
determined by the logistic model. Then inputs are then masked MCAR (hence, missing values from the second set will
depend on masked values.
In either case, weights are random and the intercept is selected to attain the desired proportion of missing values.
Parameters
----------
X : torch.DoubleTensor or np.ndarray, shape (n, d)
Data for which missing values will be simulated.
If a numpy array is provided, it will be converted to a pytorch tensor.
p : float
Proportion of missing values to generate for variables which will have missing values.
p_params : float
Proportion of variables that will be used for the logistic masking model (only if exclude_inputs).
exclude_inputs : boolean, default=True
True: mechanism (ii) is used, False: (i)
Returns
-------
mask : torch.BoolTensor or np.ndarray (depending on type of X)
Mask of generated missing values (True if the value is missing).
"""
n, d = X.shape
to_torch = torch.is_tensor(X) ## output a pytorch tensor, or a numpy array
if not to_torch:
X = torch.from_numpy(X)
mask = torch.zeros(n, d).bool() if to_torch else np.zeros((n, d)).astype(bool)
d_params = max(int(p_params * d), 1) if exclude_inputs else d ## number of variables used as inputs (at least 1)
d_na = d - d_params if exclude_inputs else d ## number of variables masked with the logistic model
### Sample variables that will be parameters for the logistic regression:
idxs_params = np.random.choice(d, d_params, replace=False) if exclude_inputs else np.arange(d)
idxs_nas = np.array([i for i in range(d) if i not in idxs_params]) if exclude_inputs else np.arange(d)
### Other variables will have NA proportions selected by a logistic model
### The parameters of this logistic model are random.
### Pick coefficients so that W^Tx has unit variance (avoids shrinking)
coeffs = pick_coeffs(X, idxs_params, idxs_nas)
### Pick the intercepts to have a desired amount of missing values
intercepts = fit_intercepts(X[:, idxs_params], coeffs, p)
ps = torch.sigmoid(X[:, idxs_params].mm(coeffs) + intercepts)
ber = torch.rand(n, d_na)
mask[:, idxs_nas] = ber < ps
## If the inputs of the logistic model are excluded from MNAR missingness,
## mask some values used in the logistic model at random.
## This makes the missingness of other variables potentially dependent on masked values
if exclude_inputs:
mask[:, idxs_params] = torch.rand(n, d_params) < p
return mask
def MNAR_self_mask_logistic(X, p):
"""
Missing not at random mechanism with a logistic self-masking model. Variables have missing values probabilities
given by a logistic model, taking the same variable as input (hence, missingness is independent from one variable
to another). The intercepts are selected to attain the desired missing rate.
Parameters
----------
X : torch.DoubleTensor or np.ndarray, shape (n, d)
Data for which missing values will be simulated.
If a numpy array is provided, it will be converted to a pytorch tensor.
p : float
Proportion of missing values to generate for variables which will have missing values.
Returns
-------
mask : torch.BoolTensor or np.ndarray (depending on type of X)
Mask of generated missing values (True if the value is missing).
"""
n, d = X.shape
to_torch = torch.is_tensor(X) ## output a pytorch tensor, or a numpy array
if not to_torch:
X = torch.from_numpy(X)
### Variables will have NA proportions that depend on those observed variables, through a logistic model
### The parameters of this logistic model are random.
### Pick coefficients so that W^Tx has unit variance (avoids shrinking)
coeffs = pick_coeffs(X, self_mask=True)
### Pick the intercepts to have a desired amount of missing values
intercepts = fit_intercepts(X, coeffs, p, self_mask=True)
ps = torch.sigmoid(X * coeffs + intercepts)
ber = torch.rand(n, d) if to_torch else np.random.rand(n, d)
mask = ber < ps if to_torch else ber < ps.numpy()
return mask
def MNAR_mask_quantiles(X, p, q, p_params, cut='both', MCAR=False):
"""
Missing not at random mechanism with quantile censorship. First, a subset of variables which will have missing
variables is randomly selected. Then, missing values are generated on the q-quantiles at random. Since
missingness depends on quantile information, it depends on masked values, hence this is a MNAR mechanism.
Parameters
----------
X : torch.DoubleTensor or np.ndarray, shape (n, d)
Data for which missing values will be simulated.
If a numpy array is provided, it will be converted to a pytorch tensor.
p : float
Proportion of missing values to generate for variables which will have missing values.
q : float
Quantile level at which the cuts should occur
p_params : float
Proportion of variables that will have missing values
cut : 'both', 'upper' or 'lower', default = 'both'
Where the cut should be applied. For instance, if q=0.25 and cut='upper', then missing values will be generated
in the upper quartiles of selected variables.
MCAR : bool, default = True
If true, masks variables that were not selected for quantile censorship with a MCAR mechanism.
Returns
-------
mask : torch.BoolTensor or np.ndarray (depending on type of X)
Mask of generated missing values (True if the value is missing).
"""
n, d = X.shape
to_torch = torch.is_tensor(X) ## output a pytorch tensor, or a numpy array
if not to_torch:
X = torch.from_numpy(X)
mask = torch.zeros(n, d).bool() if to_torch else np.zeros((n, d)).astype(bool)
d_na = max(int(p_params * d), 1) ## number of variables that will have NMAR values
### Sample variables that will have imps at the extremes
idxs_na = np.random.choice(d, d_na, replace=False) ### select at least one variable with missing values
### check if values are greater/smaller that corresponding quantiles
if cut == 'upper':
quants = quantile(X[:, idxs_na], 1-q, dim=0)
m = X[:, idxs_na] >= quants
elif cut == 'lower':
quants = quantile(X[:, idxs_na], q, dim=0)
m = X[:, idxs_na] <= quants
elif cut == 'both':
u_quants = quantile(X[:, idxs_na], 1-q, dim=0)
l_quants = quantile(X[:, idxs_na], q, dim=0)
m = (X[:, idxs_na] <= l_quants) | (X[:, idxs_na] >= u_quants)
### Hide some values exceeding quantiles
ber = torch.rand(n, d_na)
mask[:, idxs_na] = (ber < p) & m
if MCAR:
## Add a mcar mecanism on top
mask = mask | (torch.rand(n, d) < p)
return mask
def pick_coeffs(X, idxs_obs=None, idxs_nas=None, self_mask=False):
n, d = X.shape
if self_mask:
coeffs = torch.randn(d)
Wx = X * coeffs
coeffs /= torch.std(Wx, 0)
else:
d_obs = len(idxs_obs)
d_na = len(idxs_nas)
coeffs = torch.randn(d_obs, d_na)
Wx = X[:, idxs_obs].mm(coeffs)
coeffs /= torch.std(Wx, 0, keepdim=True)
return coeffs
def fit_intercepts(X, coeffs, p, self_mask=False):
if self_mask:
d = len(coeffs)
intercepts = torch.zeros(d)
for j in range(d):
def f(x):
return torch.sigmoid(X * coeffs[j] + x).mean().item() - p
intercepts[j] = optimize.bisect(f, -50, 50)
else:
d_obs, d_na = coeffs.shape
intercepts = torch.zeros(d_na)
for j in range(d_na):
def f(x):
return torch.sigmoid(X.mv(coeffs[:, j]) + x).mean().item() - p
intercepts[j] = optimize.bisect(f, -50, 50)
return intercepts