-
Notifications
You must be signed in to change notification settings - Fork 0
/
global_controller.py
69 lines (60 loc) · 2.13 KB
/
global_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import threading
import socket
from bh_controller import MessageStream
import pickle
import queue
import logging
logger = logging.getLogger('BH Controller')
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter('%(asctime)s - %(name)s: %(message)s'))
logger.addHandler(ch)
TIME_SLICE = 5
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.bind(('0.0.0.0', 9999))
sock.listen()
request_queue = queue.Queue()
token = 0
token_lock = threading.Lock()
token_cv = threading.Condition()
def serve_task():
while True:
requester = request_queue.get()
# bookkeeping
token_lock.acquire()
global token
token = int((token+1) % 1e9) # simply works
token_lock.release()
# send token to the requester
requester.send(pickle.dumps(('permit', token)))
logger.info(f'token {token} acquired by {requester.sock.getpeername()}')
# wait for token to be returned
with token_cv:
token_returned = token_cv.wait(timeout=TIME_SLICE)
if token_returned:
logger.info(f'token {token} returned in time')
else:
logger.info(f'token {token} timeout, ignore')
serve_thread = threading.Thread(target=serve_task)
serve_thread.start()
while True:
client_sock, addr = sock.accept()
logger.info(f'connection from {addr}')
msg_stream = MessageStream(client_sock, use_controller=False)
def f(msg_stream):
while True:
msg, tok = pickle.loads(msg_stream.recv())
if msg == 'acquire':
request_queue.put(msg_stream)
elif msg == 'release':
logger.info(f'token {tok} returned by {msg_stream.sock.getpeername()}')
# notify if applicable
token_lock.acquire()
if token == tok:
with token_cv:
token_cv.notify()
token_lock.release()
thread = threading.Thread(target=f, args=(msg_stream,))
thread.start()