-
Notifications
You must be signed in to change notification settings - Fork 10
/
datasets.py
160 lines (133 loc) · 5.65 KB
/
datasets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from math import log, exp
import numpy as np
import pandas as pd
class SimulatedData:
def __init__(self, hr_ratio,
average_death = 5, end_time = 15,
num_features = 10, num_var = 2,
treatment_group = False):
"""
from datasets import SimulatedData
s = SimulatedData(hr_ratio = 2)
s.generate_data(N=500)
Factory class for producing simulated survival data.
Current supports two forms of simulated data:
Linear:
Where risk is a linear combination of an observation's features
Nonlinear (Gaussian):
A gaussian combination of covariates
Parameters:
hr_ratio: lambda_max hazard ratio.
average_death: average death time that is the mean of the
Exponentional distribution.
end_time: censoring time that represents an 'end of study'. Any death
time greater than end_time will be censored.
num_features: size of observation vector. Default: 10.
num_var: number of varaibles simulated data depends on. Default: 2.
treatment_group: True or False. Include an additional covariate
representing a binary treatment group.
"""
self.hr_ratio = hr_ratio
self.end_time = end_time
self.average_death = average_death
self.treatment_group = treatment_group
self.m = int(num_features) + int(treatment_group)
self.num_var = num_var
def _linear_H(self,x):
"""
Calculates a linear combination of x's features.
Coefficients are 1, 2, ..., self.num_var, 0,..0]
Parameters:
x: (n,m) numpy array of observations
Returns:
risk: the calculated linear risk for a set of data x
"""
# Make the coefficients [1,2,...,num_var,0,..0]
b = np.zeros((self.m,))
b[0:self.num_var] = range(1,self.num_var + 1)
# Linear Combinations of Coefficients and Covariates
risk = np.dot(x, b)
return risk
def _gaussian_H(self,x,
c= 0.0, rad= 0.5):
"""
Calculates the Gaussian function of a subset of x's features.
Parameters:
x: (n, m) numpy array of observations.
c: offset of Gaussian function. Default: 0.0.
r: Gaussian scale parameter. Default: 0.5.
Returns:
risk: the calculated Gaussian risk for a set of data x
"""
max_hr, min_hr = log(self.hr_ratio), log(1.0 / self.hr_ratio)
# Z = ( (x_0 - c)^2 + (x_1 - c)^2 + ... + (x_{num_var} - c)^2)
z = np.square((x - c))
z = np.sum(z[:,0:self.num_var], axis = -1)
# Compute Gaussian
risk = max_hr * (np.exp(-(z) / (2 * rad ** 2)))
return risk
def generate_data(self, N,
method = 'gaussian', gaussian_config = {},
**kwargs):
"""
Generates a set of observations according to an exponentional Cox model.
Parameters:
N: the number of observations.
method: the type of simulated data. 'linear' or 'gaussian'.
guassian_config: dictionary of additional parameters for gaussian
simulation.
Returns:
dataset: a dictionary object with the following keys:
'x' : (N,m) numpy array of observations.
't' : (N) numpy array of observed time events.
'e' : (N) numpy array of observed time intervals.
'hr': (N) numpy array of observed true risk.
See:
Peter C Austin. Generating survival times to simulate cox proportional
hazards models with time-varying covariates. Statistics in medicine,
31(29):3946-3958, 2012.
"""
# Patient Baseline information
data = np.random.uniform(low= -1, high= 1,
size = (N,self.m))
if self.treatment_group:
data[:,-1] = np.squeeze(np.random.randint(0,2,(N,1)))
print(data[:,-1])
# Each patient has a uniform death probability
p_death = self.average_death * np.ones((N,1))
# Patients Hazard Model
# \lambda(t|X) = \lambda_0(t) exp(H(x))
#
# risk = True log hazard ratio
# log(\lambda(t|X) / \lambda_0(t)) = H(x)
if method == 'linear':
risk = self._linear_H(data)
elif method == 'gaussian':
risk = self._gaussian_H(data,**gaussian_config)
# Center the hazard ratio so population dies at the same rate
# independent of control group (makes the problem easier)
risk = risk - np.mean(risk)
# Generate time of death for each patient
# currently exponential random variable
death_time = np.zeros((N,1))
for i in range(N):
if self.treatment_group and data[i,-1] == 0:
death_time[i] = np.random.exponential(p_death[i])
else:
death_time[i] = np.random.exponential(p_death[i]) / exp(risk[i])
# Censor anything that is past end time
censoring = np.ones((N,1))
death_time[death_time > self.end_time] = self.end_time
censoring[death_time == self.end_time] = 0
# Flatten Arrays to Vectors
death_time = np.squeeze(death_time)
censoring = np.squeeze(censoring)
dataset = pd.DataFrame({
#only one column of x was used for simplicity
'x' : data[:,0].astype(np.float32),
'e' : censoring.astype(np.int32),
't' : death_time.astype(np.float32),
'hr' : risk.astype(np.float32)
})
dataset.to_csv("simulated_dat.csv",index = False)
return dataset