-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathimpurity.py
152 lines (118 loc) · 4.53 KB
/
impurity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
Impurity measure definition and optimization module.
Currently defines information gain.
Author: Francisco Penedo ([email protected])
"""
from scipy import optimize
from llt import set_llt_pars, SimpleModel, split_groups
from stl import robustness
import numpy as np
import math
def optimize_inf_gain(traces, primitive, rho, disp=False):
"""
Optimizes the extended information gain for the given labeled traces.
"""
# [t0, t1, t3, pi]
maxt = max(np.amax(traces.get_sindex(-1), 1))
lower = [0, 0, 0, min(np.amin(traces.get_sindex(primitive.index), 1))]
upper = [maxt, maxt, maxt,
max(np.amax(traces.get_sindex(primitive.index), 1))]
models = [SimpleModel(signal) for signal in traces.signals]
args = (primitive, models, rho, traces, maxt)
res = optimize.differential_evolution(
inf_gain, bounds=zip(lower, upper),
args=args, popsize=10, maxiter=10,
mutation=0.7, disp=disp,
init='latinhypercube')
return primitive, res.fun
def _transform_pars(theta, maxt):
# Transform all arguments to be in [0, 1]
t0, t1, t3, pi = theta
t1 = t0 + (maxt - t0) * t1 / maxt
t3 = (maxt - t1) * t3 / maxt
return [t0, t1, t3, pi]
def inf_gain(theta, *args):
"""
Function to optimize. Obtains the information gain of the sample theta.
The extra fixed arguments are defined as:
args = [primitive, models, prev_rho, traces, maxt]
where primitive is the formula to optimize, models is a list of SimpleModel
objects associated with each trace for the signal index defined in the
primitive, prev_rho is the robustness of each trace up until the current
node, traces is a Traces object and maxt is the maximum sampled time.
"""
primitive = args[0]
models = args[1]
# May be None, TODO check. Can't do it up in the stack
prev_rho = args[2]
traces = args[3]
maxt = args[4]
theta = _transform_pars(theta, maxt)
if theta[1] < theta[0] or theta[1] + theta[2] > maxt:
print 'bad'
return np.inf
set_llt_pars(primitive, theta[0], theta[1], theta[2], theta[3])
lrho = [[robustness(primitive, model) for model in models]]
if prev_rho is not None:
lrho.append(prev_rho)
rho_labels = zip(np.amin(lrho, 0), traces.labels)
sat, unsat = split_groups(rho_labels, lambda x: x[0]>= 0)
# compute IG
# Sum of absolute value of the robustness for all traces
stotal = sum(np.abs(zip(*rho_labels)[0]))
ig = _entropy(rho_labels) - _inweights(sat, stotal) * _entropy(sat) - \
_inweights(unsat, stotal) * _entropy(unsat)
return -ig
def _inweights(part, stotal):
if len(part) == 0:
return 0
return sum(np.abs(zip(*part)[0])) / stotal
def _entropy(part):
if len(part) == 0:
return 0
spart = float(sum(np.abs(zip(*part)[0])))
# Revert to counting when all rho = 0
if spart == 0:
w_p = len([p for p in part if p[1] >= 0]) / float(len(part))
w_n = len([p for p in part if p[1] < 0]) / float(len(part))
else:
w_p = sum([abs(p[0]) for p in part if p[1] >= 0]) / spart
w_n = sum([abs(p[0]) for p in part if p[1] < 0]) / spart
if w_p <= 0 or w_n <= 0:
return 0
else:
return - w_p * math.log(w_p) - w_n * math.log(w_n)
# Dummy functions to test the optimization structure
def optimize_inf_gain_skel(traces, primitive, rho):
# [t0, t1, t3, pi]
maxt = max(np.amax(traces.get_sindex(-1), 1))
lower = [0, 0, 0, min(np.amin(traces.get_sindex(primitive.index), 1))]
upper = [maxt, maxt, maxt,
max(np.amax(traces.get_sindex(primitive.index), 1))]
models = [SimpleModel(signal) for signal in traces.signals]
args = (primitive, models, rho)
res = optimize.differential_evolution(inf_gain_skel, bounds=zip(lower, upper),
args=args)
return primitive, res.fun
def inf_gain_skel(theta, *args):
primitive = args[0]
traces = args[1]
# May be None, TODO check. Can't do it up in the stack
robustness = args[2]
set_llt_pars(primitive, theta[0], theta[1], theta[2], theta[3])
return - theta[0] - theta[1] - theta[2] - theta[3] - primitive.index
# Unused
def constrained_sample(theta_scaled):
# all in [0, 1]
t0, t1, t3, pi = theta_scaled
if t0 > t1:
t0, t1 = t1, t0
if t1 + t3 > 1:
t3 = 1 - t1
return [t0, t1, t3, pi]
def constrained_sample_init(theta_scaled):
# all in [0, 1]
t0, t1, t3, pi = theta_scaled
t1 = t0 + (1 - t0) * t1
t3 = (1-t1) * t3
return [t0, t1, t3, pi]