forked from SeattleTestbed/seattlelib_v2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
advertise.r2py
388 lines (299 loc) · 13.5 KB
/
advertise.r2py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
"""
<Program Name>
advertise.r2py
<Started>
October 14, 2008
<Author>
Justin Cappos
<Purpose>
Module which allows clients to send advertise queries to various servers.
"""
listops = dy_import_module("listops.r2py")
centralizedadvertise = dy_import_module("centralizedadvertise.r2py")
centralizedadvertise_v2 = dy_import_module("centralizedadvertise_v2.r2py")
parallelize = dy_import_module("parallelize.r2py")
udp_centralizedadvertise = dy_import_module("udpcentralizedadvertise.r2py")
# All the names of services we can support.
# As of January 2012, openDHT is no longer a default service.
_advertise_all_services = ("central", "central_v2", "UDP")
nodemanager_announce_context = {}
for service in _advertise_all_services:
nodemanager_announce_context["skip" + service] = 0
nodemanager_announce_context["previous" + service + "skip"] = 1
nodemanager_announce_context_lock = createlock()
# an exception to indicate an error occured while advertising
class AdvertiseError(Exception):
pass
def _try_advertise_announce(args):
"""
<Purpose>
Helper function to be used in parallel with other advertise requests.
This is the function we pass to parallelize to perform simultaneous
queries.
<Arguments>
args (tuple)
A tuple containing the following:
which_service (string)
The service we should use to advertise, such as "central" or "UDP".
key (string)
The advertisement key. For most nodes, this will be a public key.
value (string)
The advertisement value to be assigned to key.
ttlval (int)
Time To Live for this advertisement, in seconds.
exceptions (List reference, should literally be [''])
An empty list reference which will be have exception data in its zero
index if something goes wrong. Due to this method's parallelized nature,
we cannot simply return this data; it is not invoked by this module.
finishedref (List with boolean in zero index)
This function sets finishedref[0] = true when it has completed
successfully. This is used as a flag so that we know when to return
advertise data to the client later.
<Exceptions>
AdvertiseError
If an invalid service type is specified, this exception will be raised.
ValueError
Too many, or too few values passed in the args tuple.
<Side Effects>
Contingent on the side effects of the modules invoked for different
services, this consumes an outsocket and insocket on use. Therefore,
invoking too many instances of these in parallel can lead to crashing
if the application exceeds its allotted socket count.
The number of sockets permitted to an application is determined by
its associated restrictions file.
<Returns>
None
"""
# ValueError if there are too many or too few values.
which_service, key, value, ttlval, exceptions, finishedref = args
if which_service not in _advertise_all_services:
raise AdvertiseError("Incorrect service type used in internal function _try_advertise_announce.")
try:
if which_service == "central":
centralizedadvertise.centralizedadvertise_announce(key, value, ttlval)
elif which_service == "central_v2":
centralizedadvertise_v2.v2centralizedadvertise_announce(key, value, ttlval)
elif which_service == "UDP":
udp_centralizedadvertise.udpcentralizedadvertise_announce(key, value, ttlval)
else:
# This should be redundant with the previous explicit AdvertiseError.
# One cannot (usually) be too careful.
raise AdvertiseError("Did not understand service type.")
finishedref[0] = True # Indicate that this instance has finished.
nodemanager_announce_context_lock.acquire(True)
try:
nodemanager_announce_context["previous" + which_service + "skip"] = 1
finally:
nodemanager_announce_context_lock.release()
except Exception, e:
nodemanager_announce_context_lock.acquire(True)
try:
exceptions[0] += 'announce error (type: ' + which_service + '): ' + str(e)
nodemanager_announce_context["skip" + which_service] = \
nodemanager_announce_context["previous" + which_service + "skip"] + 1
nodemanager_announce_context["previous" + which_service + "skip"] = \
min(nodemanager_announce_context["previous" + which_service + "skip"] * 2, 16)
finally:
nodemanager_announce_context_lock.release()
def advertise_announce(key, value, ttlval, concurrentevents=4,
graceperiod=5, timeout=30):
"""
<Purpose>
Announce (PUT) a key : value pair to all default advertise services.
<Arguments>
key (string)
The key for our advertise dictionary entry.
value (string)
The value for our advertise dictionary entry.
ttlval (int)
Time in seconds to persist the associated key<->value pair.
concurrentevents (int) (optional)
How many services to announce on in parallel.
graceperiod (float) (optional)
Amount of time to wait before returning, provided at least one of the
parallel attempts has finished.
Note that even when this method returns, parallelized announce attempts may
still be running. These will terminate in relatively short order, but be
aware of this. It could be a problem, for example, if you tried to set graceperiod
very low to send rapid-fire queries to the advertise servers. This would
probably cause you to exceed your allotted outsockets. (This is only
possible if your timeout value is greater than your graceperiod value.)
In short, graceperiod is a "soft" timeout. Provided at least one query has
been confirmed, the method will return after graceperiod seconds at most.
If none return, this could run all the way till timeout.
timeout (int) (optional)
Absolute allowed time before returning. Provided the method has not
returned by now, successful or not, it will terminate after timeout seconds.
<Exceptions>
AdvertiseError if something goes wrong.
<Side Effects>
Spawns as many worker events as concurrentevents specifies, limited by the
number of services available (currently 2). Each worker event consumes one
insocket and one outsocket until it is finished.
<Returns>
None.
"""
# convert different types to strings to avoid type conversion errors #874
key = str(key)
value = str(value)
# Wrapped in an array so we can modify the reference (python strings are immutable).
exceptions = [''] # track exceptions that occur and raise them at the end
parallize_worksets = []
start_time = getruntime()
onefinished = [False]
# Populate parallel jobs list.
for service_type in _advertise_all_services:
if nodemanager_announce_context["skip" + service_type] == 0:
parallize_worksets.append((service_type, key, value, ttlval,
exceptions, onefinished))
else:
nodemanager_announce_context_lock.acquire(True)
try:
nodemanager_announce_context["skip" + service_type] -= 1
finally:
nodemanager_announce_context_lock.release()
# Begin parallel jobs, instructing parallelize to run no more than
# concurrentevents at once.
ph = parallelize.parallelize_initfunction(parallize_worksets,
_try_advertise_announce, concurrentevents=concurrentevents)
# Once we have either timed out or exceeded graceperiod with at least one
# service reporting, return whatever data we have. Remaining threads will
# be forsaken and allowed to terminate at their leisure.
while not parallelize.parallelize_isfunctionfinished(ph):
sleep(0.015)
if getruntime() - start_time > timeout or \
(getruntime() - start_time > graceperiod and onefinished[0]):
parallelize.parallelize_abortfunction(ph)
break
# This does not terminate all parallel threads; do not assume it does.
parallelize.parallelize_closefunction(ph)
# check to see if any successfully returned
if onefinished == [False]:
raise AdvertiseError("None of the advertise services could be contacted")
# if we got an error, indicate it
if exceptions[0] != '':
raise AdvertiseError(str(exceptions))
return None
def _try_advertise_lookup(args):
"""
<Purpose>
Helper function for advertise lookups. This is the instance function for
parallel lookups which is passed to and managed by parallelize. Each
execution of this method will perform one lookup and return whatever
it is able to get.
<Arguments>
args (4-tuple)
which_service (string)
The service on which to perform a lookup. This must match one of the
values in _advertise_all_services.
key (string)
The key to retrieve a value for.
maxvals (int)
The maximum number of entries to retrieve from the server.
finishedref (Array reference with a boolean at index zero)
The state of the function instance. If it completes successfully,
this boolean will be set to True.
"""
which_service, key, maxvals, finishedref = args
if which_service not in _advertise_all_services:
raise AdvertiseError("Incorrect service type used in internal function _try_advertise_lookup.")
try:
if which_service == "central":
results = centralizedadvertise.centralizedadvertise_lookup(key, maxvals)
elif which_service == "central_v2":
results = centralizedadvertise_v2.v2centralizedadvertise_lookup(key, maxvals)
elif which_service == "UDP":
results = udp_centralizedadvertise.udpcentralizedadvertise_lookup(key, maxvals)
else:
raise AdvertiseError("Did not understand service type!")
finishedref[0] = True
return results
except Exception, e:
return []
def advertise_lookup(key, maxvals=100, lookuptype=None,
concurrentevents=4, graceperiod=5, timeout=60):
"""
<Purpose>
Lookup (GET) (a) value(s) stored at the given key in the central advertise
server, central advertise server V2, UDP, or all.
<Arguments>
key
The key used to lookup values.
maxvals (optional, defaults to 100):
Maximum number of values to return.
lookuptype (optional, defaults to ['central', 'central_v2', 'UDP']):
Which services to employ looking up values.
concurrentevents (optional, defaults to 2):
How many services to lookup on in parallel.
graceperiod (optional, defaults to 10):
After this many seconds (can be a float or int type), return the
results if one service was reached successfully.
timeout (optional, defaults to 60):
After this many seconds (can be a float or int type), give up.
<Exceptions>
TimeoutError if no service returns an answer and at least one service
has a timeout.
AdvertiseError if something goes wrong with *all* services, such as a bad
argument. *** THIS DOES NOT HAPPEN NOW IN ALL CASES. SEE #1329. ***
If some services timeout and others have different errors, a TimeoutError
will be raised. If one succeeds, no exception is raised.
<Side Effects>
Spawns as many worker events as concurrentevents specifies, limited by the
number of services in lookuptype.
<Returns>
All unique values stored at the key.
"""
# convert different types to strings to avoid type conversion errors #874
key = str(key)
# As of January 2012, DHT is no longer a default service.
if lookuptype is None:
lookuptype = ['central', 'central_v2', 'UDP']
parallel_worksets = []
start_time = getruntime()
onefinished = [False]
# Populate parallel jobs list.
for servicetype in lookuptype:
if servicetype == "central":
parallel_worksets.append(("central", key, maxvals, onefinished))
elif servicetype == "central_v2":
parallel_worksets.append(("central_v2", key, maxvals, onefinished))
elif servicetype == "UDP":
parallel_worksets.append(("UDP", key, maxvals, onefinished))
else:
raise AdvertiseError("Incorrect service type '" + servicetype + "' passed to advertise_lookup().")
# Start parallel jobs.
ph = parallelize.parallelize_initfunction(parallel_worksets,
_try_advertise_lookup, concurrentevents=concurrentevents)
# Wait until either timeout or graceperiod with at least one service
# success, and then continue.
while not parallelize.parallelize_isfunctionfinished(ph):
sleep(0.015)
if getruntime() - start_time > timeout:
# This timed out. Time to abort. Fix for #1329.
parallelize.parallelize_abortfunction(ph)
raise TimeoutError("Advertise lookup timed out without contacting any service")
if (getruntime() - start_time > graceperiod and onefinished[0]):
# This hit the grace period. (At least one service worked, but not
# as many as were requested.)
parallelize.parallelize_abortfunction(ph)
break
all_parallel_results = parallelize.parallelize_getresults(ph)
parallel_results = all_parallel_results['returned']
results = []
atleastonecorrectresult = False
# Construct a list of return results
for parallel_result in parallel_results:
requestinfo, return_value = parallel_result
# Did this succeed?
if requestinfo[3] == [True]:
atleastonecorrectresult = True
results += return_value
parallelize.parallelize_closefunction(ph)
if atleastonecorrectresult:
# Filter results and return.
return listops.listops_uniq(results)
else:
# JAC: I should check the errors here and raise TimeoutError only if
# timeout was actually the issue. However with the way this is written
# with the helper function, all exception information is lost.
raise TimeoutError('Error performing lookup. No services succeeded.')