forked from casual-silva/practice
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sockets.py
180 lines (136 loc) · 4.85 KB
/
sockets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# -*- encoding: utf-8 -*-
__author__ = "qaulau"
import sys
import os
import logging
from functools import wraps
try:
import json
except ImportError:
import simplejson as json
from gevent.pywsgi import WSGIServer
import geventwebsocket
from geventwebsocket.handler import WebSocketHandler
_logger = None
MAX_THREAD = 5
class WebSocketsLogHandler(logging.Handler):
def __init__(self, socket=None):
logging.Handler.__init__(self)
self.ws = socket
def update_ws(self, socket=None):
self.ws = socket
def flush(self):
pass
def emit(self, record):
try:
if self.ws is None or self.ws.closed:
return
msg = self.format(record)
data = {
'type': 'output' if record.levelname != 'ERROR' else 'error',
'msg': msg,
}
self.ws.send(json.dumps(data))
self.flush()
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
class RedirectOutput(object):
"""重定向标准输出"""
def __init__(self, socket=None):
self.terminal = sys.stdout
self.ws = socket
def update_ws(self, socket=None):
self.ws = socket
def write(self, output_stream):
self.terminal.write(output_stream)
if self.ws is None or self.ws.closed:
return
if output_stream in ("\n", "\r", "\t"):
return
data = {
'type': 'output',
'msg': output_stream,
}
self.ws.send(json.dumps(data))
def flush(self):
pass
def reset(self):
pass
class SocketMiddleware(object):
def __init__(self, wsgi_app, socket):
self.ws = socket
self.app = wsgi_app
def __call__(self, environ, start_response):
path = environ['PATH_INFO']
if path in self.ws.url_map:
handler = self.ws.url_map[path]
environment = environ['wsgi.websocket']
try:
handler(environment)
except geventwebsocket.WebSocketError as e:
print e
return []
else:
return self.app(environ, start_response)
class Sockets(object):
def __init__(self, app=None):
self.url_map = {}
if app:
self.init_app(app)
def init_app(self, app):
app.wsgi_app = SocketMiddleware(app.wsgi_app, self)
def route(self, rule, **options):
def decorator(f):
endpoint = options.pop('endpoint', None)
self.add_url_rule(rule, endpoint, f, **options)
return f
return decorator
def add_url_rule(self, rule, _, f, **options):
self.url_map[rule] = f
class WebSockrtLogMaker(object):
"""websocket日志监听生成器
用于处理日志数据量输出至websocket
"""
def __init__(self, logger_name, socket=None):
global _logger
if _logger is None:
_logger = logging.getLogger(logger_name)
_logger.setLevel(logging.DEBUG)
_logger.addHandler(logging.StreamHandler())
self.wslh = WebSocketsLogHandler(socket)
_logger.addHandler(self.wslh)
def update_ws(self, socket):
self.wslh.update_ws(socket)
def make_server(host, port, app=None, request_handler=None):
"""创建wsgi服务和websocket服务"""
return WSGIServer((host, port), app, handler_class=request_handler)
def run_server(hostname, port, application, use_reloader=False,
reloader_interval=1, static_files=None,
request_handler=WebSocketHandler):
""""运行服务器"""
import werkzeug
from werkzeug._internal import _log
from werkzeug.serving import select_ip_version, socket, run_with_reloader
if static_files:
from werkzeug.wsgi import SharedDataMiddleware
application = SharedDataMiddleware(application, static_files)
def inner():
make_server(hostname, port, application, request_handler).serve_forever()
if os.environ.get('WERKZEUG_RUN_MAIN') != 'true':
display_hostname = hostname != '*' and hostname or 'localhost'
if ':' in display_hostname:
display_hostname = '[%s]' % display_hostname
_log('info', ' * Running on http://%s:%d/', display_hostname, port)
if request_handler and hasattr(request_handler, 'run_websocket'):
_log('info', ' * Running on ws://%s:%d/', display_hostname, port)
if use_reloader:
address_family = select_ip_version(hostname, port)
test_socket = socket.socket(address_family, socket.SOCK_STREAM)
test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
test_socket.bind((hostname, port))
test_socket.close()
run_with_reloader(inner, None, reloader_interval)
else:
inner()