diff --git a/debian/control b/debian/control index a6d14b00..f49a6870 100644 --- a/debian/control +++ b/debian/control @@ -16,7 +16,8 @@ Depends: ${misc:Depends}, ${python3:Depends}, python3-psutil, python3-tz, python3-prompt-toolkit, - python3-pygments + python3-pygments, + python3-zeromq Breaks: yunohost (<< 4.1) Description: prototype interfaces with ease in Python Quickly and easily prototype interfaces for your application. diff --git a/moulinette/interfaces/api.py b/moulinette/interfaces/api.py index b68b1898..87d8b31c 100644 --- a/moulinette/interfaces/api.py +++ b/moulinette/interfaces/api.py @@ -65,47 +65,6 @@ def wrapper(*args, **kwargs): return wrapper -class LogQueues(dict): - """Map of session ids to queue.""" - - pass - - -class APIQueueHandler(logging.Handler): - """ - A handler class which store logging records into a queue, to be used - and retrieved from the API. - """ - - def __init__(self): - logging.Handler.__init__(self) - self.queues = LogQueues() - # actionsmap is actually set during the interface's init ... - self.actionsmap = None - - def emit(self, record): - # Prevent triggering this function while moulinette - # is being initialized with --debug - if not self.actionsmap or len(request.cookies) == 0: - return - - profile = request.params.get("profile", self.actionsmap.default_authentication) - authenticator = self.actionsmap.get_authenticator(profile) - - s_id = authenticator.get_session_cookie(raise_if_no_session_exists=False)["id"] - try: - queue = self.queues[s_id] - except KeyError: - # Session is not initialized, abandon. - return - else: - # Put the message as a 2-tuple in the queue - queue.put_nowait((record.levelname.lower(), record.getMessage())) - # Put the current greenlet to sleep for 0 second in order to - # populate the new message in the queue - sleep(0) - - class _HTTPArgumentParser: """Argument parser for HTTP requests @@ -249,9 +208,8 @@ class _ActionsMapPlugin: name = "actionsmap" api = 2 - def __init__(self, actionsmap, log_queues={}): + def __init__(self, actionsmap): self.actionsmap = actionsmap - self.log_queues = log_queues def setup(self, app): """Setup plugin on the application @@ -280,11 +238,10 @@ def setup(self, app): skip=["actionsmap"], ) - # Append messages route app.route( - "/messages", - name="messages", - callback=self.messages, + "/sse", + name="sse", + callback=self.sse, skip=["actionsmap"], ) @@ -421,46 +378,36 @@ def logout(self): else: return m18n.g("logged_in") - def messages(self): - """Listen to the messages WebSocket stream - - Retrieve the WebSocket stream and send to it each messages displayed by - the display method. They are JSON encoded as a dict { style: message }. - """ + def sse(self): + import zmq.green as zmq profile = request.params.get("profile", self.actionsmap.default_authentication) authenticator = self.actionsmap.get_authenticator(profile) - s_id = authenticator.get_session_cookie()["id"] try: - queue = self.log_queues[s_id] + authenticator.get_session_cookie() except KeyError: - # Create a new queue for the session - queue = Queue() - self.log_queues[s_id] = queue + raise HTTPResponse(m18n.g("not_logged_in"), 401) - wsock = request.environ.get("wsgi.websocket") - if not wsock: - raise HTTPResponse(m18n.g("websocket_request_expected"), 500) + ctx = zmq.Context() + sub = ctx.socket(zmq.SUB) + sub.subscribe('') + sub.connect(log.LOG_BROKER_FRONTEND_ENDPOINT) - while True: - item = queue.get() - try: - # Retrieve the message - style, message = item - except TypeError: - if item == StopIteration: - # Delete the current queue and break - del self.log_queues[s_id] - break - logger.exception("invalid item in the messages queue: %r", item) - else: - try: - # Send the message - wsock.send(json_encode({style: message})) - except WebSocketError: - break - sleep(0) + response.content_type = 'text/event-stream' + response.cache_control = 'no-cache' + + # Set client-side auto-reconnect timeout, ms. + yield 'retry: 100\n\n' + + try: + while True: + if sub.poll(10, zmq.POLLIN): + _, msg = sub.recv_multipart() + yield 'data: ' + str(msg.decode()) + '\n\n' + finally: + sub.close() + ctx.term() def process(self, _route, arguments={}): """Process the relevant action for the route @@ -498,37 +445,9 @@ def process(self, _route, arguments={}): rmtree(UPLOAD_DIR, True) UPLOAD_DIR = None - # Close opened WebSocket by putting StopIteration in the queue - profile = request.params.get( - "profile", self.actionsmap.default_authentication - ) - authenticator = self.actionsmap.get_authenticator(profile) - try: - s_id = authenticator.get_session_cookie()["id"] - queue = self.log_queues[s_id] - except MoulinetteAuthenticationError: - pass - except KeyError: - pass - else: - queue.put(StopIteration) def display(self, message, style="info"): - profile = request.params.get("profile", self.actionsmap.default_authentication) - authenticator = self.actionsmap.get_authenticator(profile) - s_id = authenticator.get_session_cookie(raise_if_no_session_exists=False)["id"] - - try: - queue = self.log_queues[s_id] - except KeyError: - return - - # Put the message as a 2-tuple in the queue - queue.put_nowait((style, message)) - - # Put the current greenlet to sleep for 0 second in order to - # populate the new message in the queue - sleep(0) + pass def prompt(self, *args, **kwargs): raise NotImplementedError("Prompt is not implemented for this interface") @@ -725,9 +644,6 @@ class Interface: Keyword arguments: - routes -- A dict of additional routes to add in the form of {(method, path): callback} - - log_queues -- A LogQueues object or None to retrieve it from - registered logging handlers - """ type = "api" @@ -737,12 +653,6 @@ def __init__(self, routes={}, actionsmap=None, allowed_cors_origins=[]): self.allowed_cors_origins = allowed_cors_origins - # Attempt to retrieve log queues from an APIQueueHandler - handler = log.getHandlersByClass(APIQueueHandler, limit=1) - if handler: - log_queues = handler.queues - handler.actionsmap = actionsmap - # TODO: Return OK to 'OPTIONS' xhr requests (l173) app = Bottle(autojson=True) @@ -785,7 +695,7 @@ def wrapper(*args, **kwargs): app.install(filter_csrf) app.install(cors) app.install(api18n) - actionsmapplugin = _ActionsMapPlugin(actionsmap, log_queues) + actionsmapplugin = _ActionsMapPlugin(actionsmap) app.install(actionsmapplugin) self.authenticate = actionsmapplugin.authenticate @@ -827,11 +737,10 @@ def run(self, host="localhost", port=80): ) try: - from gevent.pywsgi import WSGIServer - from geventwebsocket.handler import WebSocketHandler + from gevent import monkey; monkey.patch_all() + from bottle import GeventServer - server = WSGIServer((host, port), self._app, handler_class=WebSocketHandler) - server.serve_forever() + GeventServer(host, port).run(self._app) except IOError as e: error_message = "unable to start the server instance on %s:%d: %s" % ( host, diff --git a/moulinette/utils/log.py b/moulinette/utils/log.py index b468699f..665fb4eb 100644 --- a/moulinette/utils/log.py +++ b/moulinette/utils/log.py @@ -46,7 +46,6 @@ "loggers": {"moulinette": {"level": "DEBUG", "handlers": ["console"]}}, } - def configure_logging(logging_config=None): """Configure logging with default and optionally given configuration @@ -119,3 +118,41 @@ def findCaller(self, *args): break return rv + + +# Log broadcasting via the broker ----------------------------------------------- + + +# FIXME : hard-coded value about yunohost ... and also we need to secure those file such that they are not public +LOG_BROKER_BACKEND_ENDPOINT = "ipc:///var/run/yunohost/log_broker_backend" +LOG_BROKER_FRONTEND_ENDPOINT = "ipc:///var/run/yunohost/log_broker_frontend" + +if not os.path.isdir("/var/run/yunohost"): + os.mkdir("/var/run/yunohost") +os.chown("/var/run/yunohost", 0, 0) +os.chmod("/var/run/yunohost", 0o700) + +def start_log_broker(): + + from multiprocessing import Process + + def server(): + import zmq + + ctx = zmq.Context() + backend = ctx.socket(zmq.XSUB) + backend.bind(LOG_BROKER_BACKEND_ENDPOINT) + frontend = ctx.socket(zmq.XPUB) + frontend.bind(LOG_BROKER_FRONTEND_ENDPOINT) + + try: + zmq.proxy(frontend, backend) + except KeyboardInterrupt: + pass + + frontend.close() + backend.close() + ctx.term() + + p = Process(target=server) + p.start()