-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcalculate_DI.pyx
157 lines (118 loc) · 5.29 KB
/
calculate_DI.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import math
import numpy as np
from scipy.spatial.distance import pdist, squareform
class directed_information:
def __init__(self, multiplier_X=None, multiplier_Y=None, memory=5, alpha=1.01):
"""
:param memory: total length of past history to consider
:param multiplier_X: multiplier to adjust kernel width of the Gram matrix for X
:param multiplier_Y: multiplier to adjust kernel width of the Gram matrix for Y
:param alpha: order of entropy
"""
self.multiplier_X = multiplier_X
self.multiplier_Y = multiplier_Y
self.alpha = alpha
self.memory = memory
self.di = None
def GaussMat(self, mat, mult=None):
"""
Calculates the Gram matrix using Gaussian kernel evaluates the distances between input samples after projection to RKHS
:param mat: input matrix
:param mult: Multiplier to adjust kernel width of the Gram matrix for matrix
:return: Gram matrix
"""
# Ensure mat is 2-dimensional
if len(mat.shape) == 1:
mat = mat.reshape(-1, 1)
# Get the dimensions of mat
N, T = mat.shape
# Calculate bandwidth using Silverman's rule
if mult:
sig = (mult) * (1.06 * np.nanstd(mat)) * (N ** (-1 / 5))
else:
sig = (1.06 * np.nanstd(mat)) * (N ** (-1 / 5))
# scott's rule: sig = N**(-1/(4+T))
# Calculate pairwise squared Euclidean distances
pairwise_sq_dists = squareform(pdist(mat, 'sqeuclidean'))
# Calculate gram matrix using Gaussian kernel
return np.exp(-pairwise_sq_dists / sig ** 2)
def alpha_entropy(self, mat, mult=None):
"""
Calculates entropy
:param mat: input matrix
:param mult: Multiplier to adjust kernel width of the Gram matrix for matrix
:return: entropy
"""
# Get Gram matrices
N = len(mat)
K = self.GaussMat(mat, mult=mult) / N
# entropy estimation using the eigen spectrum
L = np.linalg.eigvalsh(K)
absL = np.abs(L)
H = (1 / (1 - self.alpha)) * math.log2(np.min([np.sum(absL ** self.alpha), 0.9999]))
return H
def joint_alpha_entropy(self, mat1, mat2, mat3=None, mult1=None, mult2=None, mult3=None):
"""
Calculates joint alpha entropy between two or three variables
:param mat1: input matrix1
:param mat2: input matrix2
:param mat3: input matrix3
:param mult1: Multiplier to adjust kernel width of the Gram matrix for matrix1
:param mult2: Multiplier to adjust kernel width of the Gram matrix for matrix2
:param mult3: Multiplier to adjust kernel width of the Gram matrix for matrix3
:return: joint entropy
"""
if mat3 is None:
mat3 = []
# Get gram matrices
N = len(mat1)
K1 = self.GaussMat(mat1, mult=mult1) / N
K2 = self.GaussMat(mat2, mult=mult2) / N
# Calculate Hadamard product
if len(mat3) == 0:
prodK = K1 * K2 * N
else:
K3 = self.GaussMat(mat3, mult=mult3) / N
prodK = K1 * K2 * K3 * (N ** 2)
# calculate joint alpha entropy using the eigen-spectrum
L = np.linalg.eigvalsh(prodK)
absL = np.abs(L)
H = (1 / (1 - self.alpha)) * math.log2(np.min([np.sum(absL ** self.alpha), 0.9999])) # joint entropy
return H
def conditional_mi(self, x_past, y_past, y_present):
"""
Calculates the conditional mutual information at time point i,
I(Xpast; Ypresent | Ypast) = H(Xpast, Ypast) + H(Ypresent, Ypast) - H(Xpast, Ypresent, Ypast) - H(Ypast)
:return: conditional mutual information
"""
# H(Ypast)
H_ypast = self.alpha_entropy(y_past, mult=self.multiplier_Y)
# H(Xpast, Ypast)
H_xpast_ypast = self.joint_alpha_entropy(x_past, y_past, mult1=self.multiplier_X, mult2=self.multiplier_Y)
# H(Ypresent, Ypast)
H_ypresent_ypast = self.joint_alpha_entropy(y_present, y_past, mult1=self.multiplier_Y, mult2=self.multiplier_Y)
# H(Xpast, Ypresent, Ypast)
H_xyyp = self.joint_alpha_entropy(x_past, y_present, y_past, mult1=self.multiplier_X, mult2=self.multiplier_Y,
mult3=self.multiplier_Y)
return H_ypresent_ypast + H_xpast_ypast - H_xyyp - H_ypast
def DI(self, X, Y):
"""
calculates directed information from (X -> Y)
:param X: causal time series
:param Y: effect time series
:return directed information
"""
di = 0
T = len(X)
for m in range(1, self.memory):
# initialize
x_past, y_past, y_present = np.zeros((T - m, m)), np.zeros((T - m, m)), np.zeros(T - m)
# gather past and present instances
for t in range(T - m):
x_past[t] = X[t:t + m]
y_past[t] = Y[t:t + m]
y_present[t] = Y[t + m]
# evaluate conditional mutual information and add to DI
di += self.conditional_mi(x_past, y_past, y_present)
self.di = di
return self.di