-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathadaptive_privacy_engine.py
199 lines (168 loc) · 7.67 KB
/
adaptive_privacy_engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from opacus import PrivacyEngine
from opacus import privacy_analysis as tf_privacy
import numpy as np
import os
import types
import warnings
from typing import List, Optional, Tuple, Union
class AdaptivePrivacyEngine(PrivacyEngine):
def __init__(self, *args, n_accumulation_steps=1, **kwargs):
super(AdaptivePrivacyEngine, self).__init__(*args, **kwargs)
self.privacy_ledger = {}
if 'sample_size' in kwargs:
self.sample_size = kwargs['sample_size']
else:
self.sample_size = args[2]
self.n_accumulation_steps = n_accumulation_steps
def update_batch_size(self, new_batch_size, new_n_accumulation_steps):
self._commit_to_privacy_ledger()
self.batch_size = new_batch_size
self.n_accumulation_steps = new_n_accumulation_steps
self.sample_rate = self.batch_size / self.sample_size
def update_noise_multiplier(self, new_noise_multiplier):
self._commit_to_privacy_ledger()
self.noise_multiplier = new_noise_multiplier
def _commit_to_privacy_ledger(self):
privacy_ledger_key = (self.sample_rate, self.noise_multiplier)
if privacy_ledger_key not in self.privacy_ledger:
self.privacy_ledger[privacy_ledger_key] = 0
self.privacy_ledger[privacy_ledger_key] += self.steps
self.steps = 0
def get_renyi_divergence(self, sample_rate, noise_multiplier):
return tf_privacy.compute_rdp(
sample_rate, noise_multiplier, 1, self.alphas
)
def add_query_to_ledger(self, sample_rate, noise_multiplier, n):
privacy_ledger_key = (sample_rate, noise_multiplier)
if privacy_ledger_key not in self.privacy_ledger:
self.privacy_ledger[privacy_ledger_key] = 0
self.privacy_ledger[privacy_ledger_key] += n
def get_privacy_spent(
self, target_delta: Optional[float] = None
) -> Tuple[float, float]:
"""
Computes the (epsilon, delta) privacy budget spent so far.
This method converts from an (alpha, epsilon)-DP guarantee for all alphas that
the ``PrivacyEngine`` was initialized with. It returns the optimal alpha together
with the best epsilon.
Args:
target_delta: The Target delta. If None, it will default to the privacy
engine's target delta.
Returns:
Pair of epsilon and optimal order alpha.
"""
if target_delta is None:
target_delta = self.target_delta
self._commit_to_privacy_ledger()
rdp = 0.
for (sample_rate, noise_multiplier), steps in self.privacy_ledger.items():
rdp += self.get_renyi_divergence(sample_rate, noise_multiplier) * steps
return tf_privacy.get_privacy_spent(self.alphas, rdp, target_delta)
class PrivacyFilterEngine(AdaptivePrivacyEngine):
def __init__(self, epsilon, *args, **kwargs):
super(PrivacyFilterEngine, self).__init__(*args, **kwargs)
self.epsilon = epsilon
def halt(
self,
batch_size: Optional[int] = None,
sample_rate: Optional[float] = None,
noise_multiplier: Optional[float] = None,
steps: Optional[int] = 1,
) -> bool:
r"""
Returns whether the filter would halt if asked to perform one more step
at the proposed batch_size/sample_rate and noise_multiplier. If None the
current PrivacyEngine values are used.
Args:
batch_size: The proposed new query batch size.
sample_rate: The proposed new query sample rate (either or both
batch_size and sample_rate have to be None).
noise_multiplier: The proposed new query noise multiplier.
steps: Would the filter halt within this number of steps.
Returns:
True (halt) or False (don't halt).
"""
assert(batch_size is None or sample_rate is None)
# TODO: implement through a max epsilon for each order alpha, and a
# direct check of positivity for at least one alpha. Should be much more
# efficient.
if batch_size is not None:
sample_rate = batch_size / self.sample_size
elif sample_rate is None:
sample_rate = self.sample_rate
if noise_multiplier is None:
noise_multiplier = self.noise_multiplier
self._commit_to_privacy_ledger()
self.add_query_to_ledger(sample_rate, noise_multiplier, steps)
halt = self.get_privacy_spent(target_delta=self.target_delta)[0] > self.epsilon
self.add_query_to_ledger(sample_rate, noise_multiplier, -steps)
return halt
def step(self):
if not self.halt():
super(PrivacyFilterEngine, self).step()
class PrivacyOdometerEngine(AdaptivePrivacyEngine):
def __init__(
self,
*args,
**kwargs,
):
r"""
Args:
*args: Arguments for the underlying PrivacyEngine. See
https://opacus.ai/api/privacy_engine.html.
**kwargs: Keyword arguments for the underlying PrivacyEngine.
"""
super(PrivacyOdometerEngine, self).__init__(*args, **kwargs)
self.gamma = 2**-2 * np.log(2*len(self.alphas)/self.target_delta) / (np.atleast_1d(self.alphas)-1)
def get_privacy_spent(self) -> Tuple[float, float]:
"""
Computes the (epsilon, delta) privacy budget spent so far.
This method converts from an (alpha, epsilon)-DP guarantee for all alphas that
the ``PrivacyEngine`` was initialized with. It returns the optimal alpha together
with the best epsilon.
Args:
target_delta: The Target delta. If None, it will default to the privacy
engine's target delta.
Returns:
Pair of epsilon and optimal order alpha.
"""
self._commit_to_privacy_ledger()
rdp = 0.
for (sample_rate, noise_multiplier), steps in self.privacy_ledger.items():
rdp += self.get_renyi_divergence(sample_rate, noise_multiplier) * steps
rdp = np.maximum(rdp, self.gamma)
f = np.ceil(np.log2(rdp / self.gamma))
target_delta = self.target_delta / (len(self.alphas)*2*np.power(f+1, 2))
rdp = self.gamma * np.exp2(f)
return self.get_privacy_spent_heterogeneous_delta(self.alphas, rdp, target_delta)
def get_privacy_spent_heterogeneous_delta(
self, orders: Union[List[float], float], rdp: Union[List[float], float], delta: Union[List[float], float],
) -> Tuple[float, float]:
r"""Computes epsilon given a list of Renyi Differential Privacy (RDP) values at
multiple RDP orders and target ``delta``.
Args:
orders: An array (or a scalar) of orders (alphas).
rdp: A list (or a scalar) of RDP guarantees.
delta: A list (or a scalar) of target deltas for each order.
Returns:
Pair of epsilon and optimal order alpha.
Raises:
ValueError
If the lengths of ``orders`` and ``rdp`` are not equal.
"""
orders_vec = np.atleast_1d(orders)
rdp_vec = np.atleast_1d(rdp)
delta_vec = np.atleast_1d(delta)
if len(orders_vec) != len(rdp_vec) or len(orders_vec) != len(delta_vec):
raise ValueError(
f"Input lists must have the same length.\n"
f"\torders_vec = {orders_vec}\n"
f"\trdp_vec = {rdp_vec}\n"
f"\tdelta_vec = {delta_vec}\n"
)
eps = rdp_vec - np.log(delta) / (orders_vec - 1)
# special case when there is no privacy
if np.isnan(eps).all():
return np.inf, np.nan
idx_opt = np.nanargmin(eps) # Ignore NaNs
return eps[idx_opt], orders_vec[idx_opt]