-
Notifications
You must be signed in to change notification settings - Fork 10
/
example_server.py
37 lines (27 loc) · 1005 Bytes
/
example_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import socketserver
import threading
import time
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
_, server_port = self.request.getsockname()
print('Ping on {}'.format(server_port))
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
if __name__ == '__main__':
servers = []
for _ in range(3):
server = ThreadedTCPServer(('localhost', 0), ThreadedTCPRequestHandler)
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
servers.append(server)
ports = [server.server_address[1] for server in servers]
print('Listening ports: {} {} {}'.format(*ports))
try:
while 1:
time.sleep(1)
except KeyboardInterrupt:
print() # Jumps ^C char
for server in servers:
print('Shutdown {}'.format(server.server_address[1]))
server.shutdown()