-
Notifications
You must be signed in to change notification settings - Fork 0
/
IGL_test.py
126 lines (94 loc) · 3.18 KB
/
IGL_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import numpy as np
import pandas as pd
# -----------------------------------------------------------------------------------------------------------
ni = 50
nj = 7
beta0 = 2
beta1 = 5
sigma2_a0 = 0.9
sigma2_a1 = 1
sigma_a01 = 0.3
sigma_e = 0.5
mean = [0,0]
covariance = [[sigma2_a0, sigma_a01],[sigma_a01, sigma2_a1]]
effs = np.random.multivariate_normal(mean, covariance, ni)
ids = []
effs_i = []
effs_s = []
for i in range(ni):
ids.extend([i+1] * nj)
effs_i.extend([effs[i][0]] * nj)
effs_s.extend([effs[i][1]] * nj)
data = pd.DataFrame({'id': ids,
'time': list(range(1, nj+1)) * ni,
'effs_i': effs_i,
'effs_s': effs_s})
z_ij = []
for i in range(ni * nj):
z = beta0 + effs_i[i] + ((beta1 + effs_s[i]) * data['time'].iloc[i]) + np.random.normal(0, sigma_e)
z_ij.append(z)
data['z_ij'] = z_ij
# -----------------------------------------------------------------------------------------------------------
nnij = data.shape[0]
z1 = np.zeros((nnij, nnij))
z2 = np.zeros((nnij, nnij))
z3 = np.zeros((nnij, nnij))
z4 = np.zeros((nnij, nnij))
for i in range(nnij):
for j in range(nnij):
subj_i = data['id'].iloc[i]
subj_j = data['id'].iloc[j]
visit_i = data['time'].iloc[i]
visit_j = data['time'].iloc[j]
if subj_i == subj_j and visit_i == visit_j:
z1[i, j] = 1
if subj_i == subj_j:
z2[i, j] = 1
z3[i, j] = visit_i + visit_j
z4[i, j] = visit_i * visit_j
# z2 = z2.T + z2
# np.diag(z2) = np.diag(z2) / 2
xx = np.ones((nnij, 2))
xx[:, 1] = data['time']
b1 = np.linalg.inv(np.matmul(xx.T, xx))
b2 = np.matmul(xx.T, np.asarray(data['z_ij']))
betahat = np.matmul(b1, b2)
vz1 = np.expand_dims(z1.flatten('F'), axis=-1)
vz2 = np.expand_dims(z2.flatten('F'), axis=-1)
vz3 = np.expand_dims(z3.flatten('F'), axis=-1)
vz4 = np.expand_dims(z4.flatten('F'), axis=-1)
zz = np.concatenate((vz1, vz2, vz3, vz4), axis=1)
sig_memory = []
iter = 10
k = 1
while k <= iter:
print('Iteration', k)
zhat = betahat[0] + betahat[1] * np.asarray(data['time'])
ztilde = np.expand_dims(zhat - np.asarray(data['z_ij']), axis=-1)
ztz = np.matmul(ztilde, ztilde.T)
ztz = ztz.flatten('F')
sig_est1 = np.linalg.inv(np.matmul(zz.T, zz))
sig_est2 = np.matmul(zz.T, ztz)
sig_est = np.matmul(sig_est1, sig_est2)
s_e = sig_est[0]
s_a0 = sig_est[1]
s_a01 = sig_est[2]
s_a1 = sig_est[3]
sigma_update = np.zeros((nnij, nnij))
for i in range(nnij):
for j in range(nnij):
subj_i = data['id'].iloc[i]
subj_j = data['id'].iloc[j]
visit_i = data['time'].iloc[i]
visit_j = data['time'].iloc[j]
if subj_i == subj_j:
sigma_update[i, j] = s_a0
if subj_i == subj_j and visit_i == visit_j:
sigma_update[i, j] = s_e + s_a0 + (s_a01 * z3[i, j]) + (s_a1 * z4[i, j])
b1 = np.linalg.inv(xx.T @ np.linalg.inv(sigma_update) @ xx)
b2 = xx.T @ np.linalg.inv(sigma_update) @ np.asarray(data['z_ij'])
betahat = b1 @ b2
sig_memory.append(sig_est.tolist())
k += 1
for i in range(len(sig_memory)):
print(sig_memory[i])