From 2de7d33a8a51beedb06884e99dfacbbe7fe65aa2 Mon Sep 17 00:00:00 2001 From: nkitsaini <74284503+nkitsaini@users.noreply.github.com> Date: Sat, 21 Sep 2024 02:13:37 +0530 Subject: [PATCH] add notebook functions for starting dashboard and cloud functions (#35) Co-authored-by: Ankit Saini --- singlestoredb/apps/__init__.py | 2 + singlestoredb/apps/_cloud_functions.py | 70 ++++++++++++++++++++++++++ singlestoredb/apps/_config.py | 33 ++++++++++++ singlestoredb/apps/_dashboards.py | 51 +++++++++++++++++++ singlestoredb/apps/_process.py | 32 ++++++++++++ singlestoredb/apps/_uvicorn_util.py | 32 ++++++++++++ test-requirements.txt | 2 + 7 files changed, 222 insertions(+) create mode 100644 singlestoredb/apps/__init__.py create mode 100644 singlestoredb/apps/_cloud_functions.py create mode 100644 singlestoredb/apps/_config.py create mode 100644 singlestoredb/apps/_dashboards.py create mode 100644 singlestoredb/apps/_process.py create mode 100644 singlestoredb/apps/_uvicorn_util.py diff --git a/singlestoredb/apps/__init__.py b/singlestoredb/apps/__init__.py new file mode 100644 index 000000000..b9f54898f --- /dev/null +++ b/singlestoredb/apps/__init__.py @@ -0,0 +1,2 @@ +from ._cloud_functions import run_function_app # noqa: F401 +from ._dashboards import run_dashboard_app # noqa: F401 diff --git a/singlestoredb/apps/_cloud_functions.py b/singlestoredb/apps/_cloud_functions.py new file mode 100644 index 000000000..7a168baa5 --- /dev/null +++ b/singlestoredb/apps/_cloud_functions.py @@ -0,0 +1,70 @@ +import asyncio +import typing +import urllib.parse + +from ._config import AppConfig +from ._process import kill_process_by_port + +if typing.TYPE_CHECKING: + from fastapi import FastAPI + from ._uvicorn_util import AwaitableUvicornServer + +# Keep track of currently running server +_running_server: 'typing.Optional[AwaitableUvicornServer]' = None + + +async def run_function_app( + app: 'FastAPI', + log_level: str = 'error', + kill_existing_app_server: bool = True, +) -> None: + + global _running_server + from ._uvicorn_util import AwaitableUvicornServer + + try: + import uvicorn + except ImportError: + raise ImportError('package uvicorn is required to run cloud functions') + try: + import fastapi + except ImportError: + raise ImportError('package fastapi is required to run cloud functions') + + if not isinstance(app, fastapi.FastAPI): + raise TypeError('app is not an instance of FastAPI') + + app_config = AppConfig.from_env() + + if kill_existing_app_server: + # Shutdown the server gracefully if it was started by us. + # Since the uvicorn server doesn't start a new subprocess + # killing the process would result in kernel dying. + if _running_server is not None: + await _running_server.shutdown() + _running_server = None + + # Kill if any other process is occupying the port + kill_process_by_port(app_config.listen_port) + + # Add `GET /` route, used for liveness check + @app.get('/') + def ping() -> str: + return 'Success!' + + base_path = urllib.parse.urlparse(app_config.url).path + app.root_path = base_path + + config = uvicorn.Config( + app, + host='0.0.0.0', + port=app_config.listen_port, + log_level=log_level, + ) + _running_server = AwaitableUvicornServer(config) + + asyncio.create_task(_running_server.serve()) + await _running_server.wait_for_startup() + + if app_config.running_interactively: + print(f'Cloud function available at {app_config.url}') diff --git a/singlestoredb/apps/_config.py b/singlestoredb/apps/_config.py new file mode 100644 index 000000000..2b9c270b1 --- /dev/null +++ b/singlestoredb/apps/_config.py @@ -0,0 +1,33 @@ +import os +from dataclasses import dataclass + + +@dataclass +class AppConfig: + listen_port: int + url: str + running_interactively: bool + + @classmethod + def from_env(cls) -> 'AppConfig': + port = os.environ.get('SINGLESTOREDB_APP_LISTEN_PORT') + if port is None: + raise RuntimeError( + 'Missing SINGLESTOREDB_APP_LISTEN_PORT environment variable. ' + 'Is the code running outside SingleStoreDB notebook environment?', + ) + url = os.environ.get('SINGLESTOREDB_APP_URL') + if url is None: + raise RuntimeError( + 'Missing SINGLESTOREDB_APP_URL environment variable. ' + 'Is the code running outside SingleStoreDB notebook environment?', + ) + + workload_type = os.environ.get('SINGLESTOREDB_WORKLOAD_TYPE') + running_interactively = workload_type == 'InteractiveNotebook' + + return cls( + listen_port=int(port), + url=url, + running_interactively=running_interactively, + ) diff --git a/singlestoredb/apps/_dashboards.py b/singlestoredb/apps/_dashboards.py new file mode 100644 index 000000000..c288b0595 --- /dev/null +++ b/singlestoredb/apps/_dashboards.py @@ -0,0 +1,51 @@ +import typing +import urllib.parse + +from ._config import AppConfig +from ._process import kill_process_by_port + +if typing.TYPE_CHECKING: + from plotly.graph_objs import Figure + + +def run_dashboard_app( + figure: 'Figure', + debug: bool = False, + kill_existing_app_server: bool = True, +) -> None: + try: + import dash + except ImportError: + raise ImportError('package dash is required to run dashboards') + + try: + from plotly.graph_objs import Figure + except ImportError: + raise ImportError('package dash is required to run dashboards') + + if not isinstance(figure, Figure): + raise TypeError('figure is not an instance of plotly Figure') + + app_config = AppConfig.from_env() + + if kill_existing_app_server: + kill_process_by_port(app_config.listen_port) + + base_path = urllib.parse.urlparse(app_config.url).path + + app = dash.Dash(requests_pathname_prefix=base_path) + app.layout = dash.html.Div( + [ + dash.dcc.Graph(figure=figure), + ], + ) + + app.run( + host='0.0.0.0', + debug=debug, + port=str(app_config.listen_port), + jupyter_mode='external', + ) + + if app_config.running_interactively: + print(f'Dash app available at {app_config.url}') diff --git a/singlestoredb/apps/_process.py b/singlestoredb/apps/_process.py new file mode 100644 index 000000000..8ec04810e --- /dev/null +++ b/singlestoredb/apps/_process.py @@ -0,0 +1,32 @@ +import os +import signal +import typing +if typing.TYPE_CHECKING: + from psutil import Process + + +def kill_process_by_port(port: int) -> None: + existing_process = _find_process_by_port(port) + kernel_pid = os.getpid() + # Make sure we are not killing current kernel + if existing_process is not None and kernel_pid != existing_process.pid: + print(f'Killing process {existing_process.pid} which is using port {port}') + os.kill(existing_process.pid, signal.SIGKILL) + + +def _find_process_by_port(port: int) -> 'Process | None': + try: + import psutil + except ImportError: + raise ImportError('package psutil is required') + + for proc in psutil.process_iter(['pid']): + try: + connections = proc.connections() + for conn in connections: + if conn.laddr.port == port: + return proc + except psutil.AccessDenied: + pass + + return None diff --git a/singlestoredb/apps/_uvicorn_util.py b/singlestoredb/apps/_uvicorn_util.py new file mode 100644 index 000000000..7f5c43325 --- /dev/null +++ b/singlestoredb/apps/_uvicorn_util.py @@ -0,0 +1,32 @@ +import asyncio +import socket +from typing import List +from typing import Optional +try: + import uvicorn +except ImportError: + raise ImportError('package uvicorn is required') + + +class AwaitableUvicornServer(uvicorn.Server): + """ + Adds `wait_for_startup` method. + The function (asynchronously) blocks until the server + starts listening or throws an error. + """ + + def __init__(self, config: 'uvicorn.Config') -> None: + super().__init__(config) + self._startup_future = asyncio.get_event_loop().create_future() + + async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None: + try: + result = await super().startup(sockets) + self._startup_future.set_result(True) + return result + except Exception as error: + self._startup_future.set_exception(error) + raise error + + async def wait_for_startup(self) -> None: + await self._startup_future diff --git a/test-requirements.txt b/test-requirements.txt index eab0108c3..d63e274de 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,4 +1,6 @@ coverage +dash +fastapi pandas parameterized polars