Skip to content

Commit

Permalink
add notebook functions for starting dashboard and cloud functions (#35)
Browse files Browse the repository at this point in the history
Co-authored-by: Ankit Saini <[email protected]>
  • Loading branch information
nkitsaini and nkitsaini authored Sep 20, 2024
1 parent f268ffc commit 2de7d33
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 0 deletions.
2 changes: 2 additions & 0 deletions singlestoredb/apps/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from ._cloud_functions import run_function_app # noqa: F401
from ._dashboards import run_dashboard_app # noqa: F401
70 changes: 70 additions & 0 deletions singlestoredb/apps/_cloud_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
import typing
import urllib.parse

from ._config import AppConfig
from ._process import kill_process_by_port

if typing.TYPE_CHECKING:
from fastapi import FastAPI
from ._uvicorn_util import AwaitableUvicornServer

# Keep track of currently running server
_running_server: 'typing.Optional[AwaitableUvicornServer]' = None


async def run_function_app(
app: 'FastAPI',
log_level: str = 'error',
kill_existing_app_server: bool = True,
) -> None:

global _running_server
from ._uvicorn_util import AwaitableUvicornServer

try:
import uvicorn
except ImportError:
raise ImportError('package uvicorn is required to run cloud functions')
try:
import fastapi
except ImportError:
raise ImportError('package fastapi is required to run cloud functions')

if not isinstance(app, fastapi.FastAPI):
raise TypeError('app is not an instance of FastAPI')

app_config = AppConfig.from_env()

if kill_existing_app_server:
# Shutdown the server gracefully if it was started by us.
# Since the uvicorn server doesn't start a new subprocess
# killing the process would result in kernel dying.
if _running_server is not None:
await _running_server.shutdown()
_running_server = None

# Kill if any other process is occupying the port
kill_process_by_port(app_config.listen_port)

# Add `GET /` route, used for liveness check
@app.get('/')
def ping() -> str:
return 'Success!'

base_path = urllib.parse.urlparse(app_config.url).path
app.root_path = base_path

config = uvicorn.Config(
app,
host='0.0.0.0',
port=app_config.listen_port,
log_level=log_level,
)
_running_server = AwaitableUvicornServer(config)

asyncio.create_task(_running_server.serve())
await _running_server.wait_for_startup()

if app_config.running_interactively:
print(f'Cloud function available at {app_config.url}')
33 changes: 33 additions & 0 deletions singlestoredb/apps/_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os
from dataclasses import dataclass


@dataclass
class AppConfig:
listen_port: int
url: str
running_interactively: bool

@classmethod
def from_env(cls) -> 'AppConfig':
port = os.environ.get('SINGLESTOREDB_APP_LISTEN_PORT')
if port is None:
raise RuntimeError(
'Missing SINGLESTOREDB_APP_LISTEN_PORT environment variable. '
'Is the code running outside SingleStoreDB notebook environment?',
)
url = os.environ.get('SINGLESTOREDB_APP_URL')
if url is None:
raise RuntimeError(
'Missing SINGLESTOREDB_APP_URL environment variable. '
'Is the code running outside SingleStoreDB notebook environment?',
)

workload_type = os.environ.get('SINGLESTOREDB_WORKLOAD_TYPE')
running_interactively = workload_type == 'InteractiveNotebook'

return cls(
listen_port=int(port),
url=url,
running_interactively=running_interactively,
)
51 changes: 51 additions & 0 deletions singlestoredb/apps/_dashboards.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import typing
import urllib.parse

from ._config import AppConfig
from ._process import kill_process_by_port

if typing.TYPE_CHECKING:
from plotly.graph_objs import Figure


def run_dashboard_app(
figure: 'Figure',
debug: bool = False,
kill_existing_app_server: bool = True,
) -> None:
try:
import dash
except ImportError:
raise ImportError('package dash is required to run dashboards')

try:
from plotly.graph_objs import Figure
except ImportError:
raise ImportError('package dash is required to run dashboards')

if not isinstance(figure, Figure):
raise TypeError('figure is not an instance of plotly Figure')

app_config = AppConfig.from_env()

if kill_existing_app_server:
kill_process_by_port(app_config.listen_port)

base_path = urllib.parse.urlparse(app_config.url).path

app = dash.Dash(requests_pathname_prefix=base_path)
app.layout = dash.html.Div(
[
dash.dcc.Graph(figure=figure),
],
)

app.run(
host='0.0.0.0',
debug=debug,
port=str(app_config.listen_port),
jupyter_mode='external',
)

if app_config.running_interactively:
print(f'Dash app available at {app_config.url}')
32 changes: 32 additions & 0 deletions singlestoredb/apps/_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import os
import signal
import typing
if typing.TYPE_CHECKING:
from psutil import Process


def kill_process_by_port(port: int) -> None:
existing_process = _find_process_by_port(port)
kernel_pid = os.getpid()
# Make sure we are not killing current kernel
if existing_process is not None and kernel_pid != existing_process.pid:
print(f'Killing process {existing_process.pid} which is using port {port}')
os.kill(existing_process.pid, signal.SIGKILL)


def _find_process_by_port(port: int) -> 'Process | None':
try:
import psutil
except ImportError:
raise ImportError('package psutil is required')

for proc in psutil.process_iter(['pid']):
try:
connections = proc.connections()
for conn in connections:
if conn.laddr.port == port:
return proc
except psutil.AccessDenied:
pass

return None
32 changes: 32 additions & 0 deletions singlestoredb/apps/_uvicorn_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import asyncio
import socket
from typing import List
from typing import Optional
try:
import uvicorn
except ImportError:
raise ImportError('package uvicorn is required')


class AwaitableUvicornServer(uvicorn.Server):
"""
Adds `wait_for_startup` method.
The function (asynchronously) blocks until the server
starts listening or throws an error.
"""

def __init__(self, config: 'uvicorn.Config') -> None:
super().__init__(config)
self._startup_future = asyncio.get_event_loop().create_future()

async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None:
try:
result = await super().startup(sockets)
self._startup_future.set_result(True)
return result
except Exception as error:
self._startup_future.set_exception(error)
raise error

async def wait_for_startup(self) -> None:
await self._startup_future
2 changes: 2 additions & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
coverage
dash
fastapi
pandas
parameterized
polars
Expand Down

0 comments on commit 2de7d33

Please sign in to comment.