Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC for new log streaming API using a zero-mq broker #333

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion debian/control
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ Depends: ${misc:Depends}, ${python3:Depends},
python3-psutil,
python3-tz,
python3-prompt-toolkit,
python3-pygments
python3-pygments,
python3-zeromq
Breaks: yunohost (<< 4.1)
Description: prototype interfaces with ease in Python
Quickly and easily prototype interfaces for your application.
Expand Down
153 changes: 31 additions & 122 deletions moulinette/interfaces/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,47 +65,6 @@ def wrapper(*args, **kwargs):
return wrapper


class LogQueues(dict):
"""Map of session ids to queue."""

pass


class APIQueueHandler(logging.Handler):
"""
A handler class which store logging records into a queue, to be used
and retrieved from the API.
"""

def __init__(self):
logging.Handler.__init__(self)
self.queues = LogQueues()
# actionsmap is actually set during the interface's init ...
self.actionsmap = None

def emit(self, record):
# Prevent triggering this function while moulinette
# is being initialized with --debug
if not self.actionsmap or len(request.cookies) == 0:
return

profile = request.params.get("profile", self.actionsmap.default_authentication)
authenticator = self.actionsmap.get_authenticator(profile)

s_id = authenticator.get_session_cookie(raise_if_no_session_exists=False)["id"]
try:
queue = self.queues[s_id]
except KeyError:
# Session is not initialized, abandon.
return
else:
# Put the message as a 2-tuple in the queue
queue.put_nowait((record.levelname.lower(), record.getMessage()))
# Put the current greenlet to sleep for 0 second in order to
# populate the new message in the queue
sleep(0)


class _HTTPArgumentParser:
"""Argument parser for HTTP requests

Expand Down Expand Up @@ -249,9 +208,8 @@ class _ActionsMapPlugin:
name = "actionsmap"
api = 2

def __init__(self, actionsmap, log_queues={}):
def __init__(self, actionsmap):
self.actionsmap = actionsmap
self.log_queues = log_queues

def setup(self, app):
"""Setup plugin on the application
Expand Down Expand Up @@ -280,11 +238,10 @@ def setup(self, app):
skip=["actionsmap"],
)

# Append messages route
app.route(
"/messages",
name="messages",
callback=self.messages,
"/sse",
name="sse",
callback=self.sse,
skip=["actionsmap"],
)

Expand Down Expand Up @@ -421,46 +378,36 @@ def logout(self):
else:
return m18n.g("logged_in")

def messages(self):
"""Listen to the messages WebSocket stream

Retrieve the WebSocket stream and send to it each messages displayed by
the display method. They are JSON encoded as a dict { style: message }.
"""
def sse(self):
import zmq.green as zmq

profile = request.params.get("profile", self.actionsmap.default_authentication)
authenticator = self.actionsmap.get_authenticator(profile)

s_id = authenticator.get_session_cookie()["id"]
try:
queue = self.log_queues[s_id]
authenticator.get_session_cookie()
except KeyError:
# Create a new queue for the session
queue = Queue()
self.log_queues[s_id] = queue
raise HTTPResponse(m18n.g("not_logged_in"), 401)

wsock = request.environ.get("wsgi.websocket")
if not wsock:
raise HTTPResponse(m18n.g("websocket_request_expected"), 500)
ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.subscribe('')
sub.connect(log.LOG_BROKER_FRONTEND_ENDPOINT)

while True:
item = queue.get()
try:
# Retrieve the message
style, message = item
except TypeError:
if item == StopIteration:
# Delete the current queue and break
del self.log_queues[s_id]
break
logger.exception("invalid item in the messages queue: %r", item)
else:
try:
# Send the message
wsock.send(json_encode({style: message}))
except WebSocketError:
break
sleep(0)
response.content_type = 'text/event-stream'
response.cache_control = 'no-cache'

# Set client-side auto-reconnect timeout, ms.
yield 'retry: 100\n\n'

try:
while True:
if sub.poll(10, zmq.POLLIN):
_, msg = sub.recv_multipart()
yield 'data: ' + str(msg.decode()) + '\n\n'
finally:
sub.close()
ctx.term()

def process(self, _route, arguments={}):
"""Process the relevant action for the route
Expand Down Expand Up @@ -498,37 +445,9 @@ def process(self, _route, arguments={}):
rmtree(UPLOAD_DIR, True)
UPLOAD_DIR = None

# Close opened WebSocket by putting StopIteration in the queue
profile = request.params.get(
"profile", self.actionsmap.default_authentication
)
authenticator = self.actionsmap.get_authenticator(profile)
try:
s_id = authenticator.get_session_cookie()["id"]
queue = self.log_queues[s_id]
except MoulinetteAuthenticationError:
pass
except KeyError:
pass
else:
queue.put(StopIteration)

def display(self, message, style="info"):
profile = request.params.get("profile", self.actionsmap.default_authentication)
authenticator = self.actionsmap.get_authenticator(profile)
s_id = authenticator.get_session_cookie(raise_if_no_session_exists=False)["id"]

try:
queue = self.log_queues[s_id]
except KeyError:
return

# Put the message as a 2-tuple in the queue
queue.put_nowait((style, message))

# Put the current greenlet to sleep for 0 second in order to
# populate the new message in the queue
sleep(0)
pass

def prompt(self, *args, **kwargs):
raise NotImplementedError("Prompt is not implemented for this interface")
Expand Down Expand Up @@ -725,9 +644,6 @@ class Interface:
Keyword arguments:
- routes -- A dict of additional routes to add in the form of
{(method, path): callback}
- log_queues -- A LogQueues object or None to retrieve it from
registered logging handlers

"""

type = "api"
Expand All @@ -737,12 +653,6 @@ def __init__(self, routes={}, actionsmap=None, allowed_cors_origins=[]):

self.allowed_cors_origins = allowed_cors_origins

# Attempt to retrieve log queues from an APIQueueHandler
handler = log.getHandlersByClass(APIQueueHandler, limit=1)
if handler:
log_queues = handler.queues
handler.actionsmap = actionsmap

# TODO: Return OK to 'OPTIONS' xhr requests (l173)
app = Bottle(autojson=True)

Expand Down Expand Up @@ -785,7 +695,7 @@ def wrapper(*args, **kwargs):
app.install(filter_csrf)
app.install(cors)
app.install(api18n)
actionsmapplugin = _ActionsMapPlugin(actionsmap, log_queues)
actionsmapplugin = _ActionsMapPlugin(actionsmap)
app.install(actionsmapplugin)

self.authenticate = actionsmapplugin.authenticate
Expand Down Expand Up @@ -827,11 +737,10 @@ def run(self, host="localhost", port=80):
)

try:
from gevent.pywsgi import WSGIServer
from geventwebsocket.handler import WebSocketHandler
from gevent import monkey; monkey.patch_all()
from bottle import GeventServer

server = WSGIServer((host, port), self._app, handler_class=WebSocketHandler)
server.serve_forever()
GeventServer(host, port).run(self._app)
except IOError as e:
error_message = "unable to start the server instance on %s:%d: %s" % (
host,
Expand Down
39 changes: 38 additions & 1 deletion moulinette/utils/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
"loggers": {"moulinette": {"level": "DEBUG", "handlers": ["console"]}},
}


def configure_logging(logging_config=None):
"""Configure logging with default and optionally given configuration

Expand Down Expand Up @@ -119,3 +118,41 @@ def findCaller(self, *args):
break

return rv


# Log broadcasting via the broker -----------------------------------------------


# FIXME : hard-coded value about yunohost ... and also we need to secure those file such that they are not public
LOG_BROKER_BACKEND_ENDPOINT = "ipc:///var/run/yunohost/log_broker_backend"
LOG_BROKER_FRONTEND_ENDPOINT = "ipc:///var/run/yunohost/log_broker_frontend"

if not os.path.isdir("/var/run/yunohost"):
os.mkdir("/var/run/yunohost")
os.chown("/var/run/yunohost", 0, 0)
os.chmod("/var/run/yunohost", 0o700)

def start_log_broker():

from multiprocessing import Process

def server():
import zmq

ctx = zmq.Context()
backend = ctx.socket(zmq.XSUB)
backend.bind(LOG_BROKER_BACKEND_ENDPOINT)
frontend = ctx.socket(zmq.XPUB)
frontend.bind(LOG_BROKER_FRONTEND_ENDPOINT)

try:
zmq.proxy(frontend, backend)
except KeyboardInterrupt:
pass

frontend.close()
backend.close()
ctx.term()

p = Process(target=server)
p.start()
Loading