-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
executable file
·135 lines (107 loc) · 5.75 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import numpy as np
import random
import tensorflow as tf
from ares.attack.base import BatchAttack
from ares.attack.utils import get_xs_ph, get_ys_ph, maybe_to_array
from ares.loss.base import Loss
from ares.loss import CrossEntropyLoss
class MyLoss(Loss):
def __init__(self, model):
self.model = model
def __call__(self, xs, ys):
logits, label = self.model._logits_and_labels(xs)
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=ys, logits=logits)
#print("loss", loss)
mask = tf.one_hot(ys, depth=tf.shape(logits)[1])
label_score = tf.reduce_sum(mask*logits, axis=1)
second_scores = tf.reduce_max((1- mask) * logits, axis=1)
loss_1 = -(label_score - second_scores)
stop_mask = tf.cast(tf.equal(label, ys), dtype=tf.float32)
return loss_1, stop_mask
class Attacker(BatchAttack):
def __init__(self, model, batch_size, dataset, session):
''' Based on ares.attack.bim.BIM '''
self.model, self.batch_size, self._session = model, batch_size, session
# dataset == "imagenet" or "cifar10"
#loss = CrossEntropyLoss(self.model)
loss = MyLoss(self.model)
# placeholder for batch_attack's input
self.xs_ph = get_xs_ph(model, batch_size)
self.xs_adv_var_ph = get_xs_ph(model, batch_size)
self.ys_ph = get_ys_ph(model, batch_size)
# flatten shape of xs_ph
xs_flatten_shape = (batch_size, np.prod(self.model.x_shape))
# store xs and ys in variables to reduce memory copy between tensorflow and python
# variable for the original example with shape of (batch_size, D)
self.xs_var = tf.Variable(tf.zeros(shape=xs_flatten_shape, dtype=self.model.x_dtype))
# variable for labels
self.ys_var = tf.Variable(tf.zeros(shape=(batch_size,), dtype=self.model.y_dtype))
# variable for the (hopefully) adversarial example with shape of (batch_size, D)
self.xs_adv_var = tf.Variable(tf.zeros(shape=xs_flatten_shape, dtype=self.model.x_dtype))
# magnitude
self.eps_ph = tf.placeholder(self.model.x_dtype, (self.batch_size,))
self.eps_var = tf.Variable(tf.zeros((self.batch_size,), dtype=self.model.x_dtype))
# step size
self.alpha_ph = tf.placeholder(self.model.x_dtype, (self.batch_size,))
self.alpha_var = tf.Variable(tf.zeros((self.batch_size,), dtype=self.model.x_dtype))
# expand dim for easier broadcast operations
eps = tf.expand_dims(self.eps_var, 1)
alpha = tf.expand_dims(self.alpha_var, 1)
# calculate loss' gradient with relate to the adversarial example
# grad.shape == (batch_size, D)
xs_lo, xs_hi = self.xs_var - eps, self.xs_var + eps
self.xs_adv_model = tf.reshape(self.xs_adv_var, (batch_size, *self.model.x_shape))
self.loss, self.stop_mask = loss(self.xs_adv_model, self.ys_var)
self.stop_mask = tf.cast(self.stop_mask, dtype=tf.float32)
self.stop_mask = tf.expand_dims(self.stop_mask, axis=1)
grad = tf.gradients(self.loss, self.xs_adv_var)[0]
# update the adversarial example
grad_sign = tf.sign(grad)
# clip by max l_inf magnitude of adversarial noise
xs_adv_next = tf.clip_by_value(self.xs_adv_var + alpha * self.stop_mask * grad_sign, xs_lo, xs_hi)
#xs_adv_next = tf.clip_by_value(self.xs_adv_var + alpha * grad_sign, xs_lo, xs_hi)
# clip by (x_min, x_max)
xs_adv_next = tf.clip_by_value(xs_adv_next, self.model.x_min, self.model.x_max)
self.update_xs_adv_step = self.xs_adv_var.assign(xs_adv_next)
self.config_eps_step = self.eps_var.assign(self.eps_ph)
self.config_alpha_step = self.alpha_var.assign(self.alpha_ph)
self.delta = self.init_delta(batch_size)
self.setup_xs = [self.xs_var.assign(tf.reshape(self.xs_ph, xs_flatten_shape)),
self.xs_adv_var.assign(tf.reshape(self.xs_adv_var_ph, xs_flatten_shape))]
self.setup_ys = self.ys_var.assign(self.ys_ph)
self.iteration = 30
def config(self, **kwargs):
if 'magnitude' in kwargs:
self.eps = kwargs['magnitude'] - 1e-6
eps = maybe_to_array(self.eps, self.batch_size)
self._session.run(self.config_eps_step, feed_dict={self.eps_ph: eps})
self._session.run(self.config_alpha_step, feed_dict={self.alpha_ph: eps / 7})
def init_delta(self, batch_size):
x = np.zeros(self.model.x_shape)
for i in range(x.shape[0]):
c = [1,1,1]
for j in range(3):
if random.uniform(0,1)<0.5: c[j] = -c[j]
c = np.array(c)
x[i, :, :] += c
x = np.ones((batch_size, 1, 1, 1)) * x
return x
def batch_attack(self, xs, ys=None, ys_target=None):
self._session.run(self.setup_xs, feed_dict={self.xs_ph: xs, self.xs_adv_var_ph: xs+self.delta})
self._session.run(self.setup_ys, feed_dict={self.ys_ph: ys})
for i in range(self.iteration):
self._session.run(self.update_xs_adv_step)
model = self.model
# for test
"""
xs_ph = tf.placeholder(model.x_dtype, shape=(None, *model.x_shape))
labels_op = model.labels(xs_ph)
xs_adv = self._session.run(self.xs_adv_model)
xs_adv = np.clip(xs_adv, xs - self.eps, xs+self.eps)
xs_adv = np.clip(xs_adv, model.x_min, model.x_max)
assert not np.any(np.isnan(xs_adv))
labels = self._session.run(labels_op, feed_dict={xs_ph: xs_adv})
print("in score", np.sum(np.logical_not(np.equal(labels, ys))))
"""
print(i,"stop_mask", tf.reduce_sum(self.stop_mask).eval(session=self._session))
return self._session.run(self.xs_adv_model)