Skip to content

Commit

Permalink
add airflow ui-api command (apache#41896)
Browse files Browse the repository at this point in the history
add port binding

Add tests

Add ui-api to airflow standalone

Fix bad descriptor error on shutting down the worker
  • Loading branch information
pierrejeambrun authored Sep 2, 2024
1 parent 351961c commit c45be77
Show file tree
Hide file tree
Showing 19 changed files with 532 additions and 23 deletions.
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,9 @@ repos:
(?x)
^airflow/api_connexion/openapi/v1.yaml$|
^airflow/ui/openapi-gen/|
^airflow/cli/commands/webserver_command.py$|
^airflow/cli/commands/internal_api_command.py$|
^airflow/cli/commands/ui_api_command.py$|
^airflow/cli/commands/webserver_command.py$|
^airflow/config_templates/|
^airflow/models/baseoperator.py$|
^airflow/operators/__init__.py$|
Expand Down
31 changes: 30 additions & 1 deletion airflow/api_ui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
# under the License.
from __future__ import annotations

from fastapi import FastAPI
from fastapi import APIRouter, FastAPI

from airflow.www.extensions.init_dagbag import get_dag_bag

app: FastAPI | None = None


def init_dag_bag(app: FastAPI) -> None:
"""
Expand All @@ -39,4 +41,31 @@ def create_app() -> FastAPI:

init_dag_bag(app)

init_views(app)

return app


def init_views(app) -> None:
"""Init views by registering the different routers."""
from airflow.api_ui.views.datasets import dataset_router

root_router = APIRouter(prefix="/ui")

root_router.include_router(dataset_router)

app.include_router(root_router)


def cached_app(config=None, testing=False):
"""Return cached instance of Airflow UI app."""
global app
if not app:
app = create_app()
return app


def purge_cached_app():
"""Remove the cached version of the app in global state."""
global app
app = None
33 changes: 33 additions & 0 deletions airflow/api_ui/gunicorn_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import setproctitle

from airflow import settings


def post_worker_init(_):
"""
Set process title.
This is used by airflow.cli.commands.ui_api_command to track the status of the worker.
"""
old_title = setproctitle.getproctitle()
setproctitle.setproctitle(settings.GUNICORN_WORKER_READY_PREFIX + old_title)
14 changes: 2 additions & 12 deletions airflow/api_ui/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@

from __future__ import annotations

from fastapi import APIRouter
from airflow.api_ui.app import cached_app

from airflow.api_ui.app import create_app
from airflow.api_ui.views.datasets import dataset_router

app = create_app()

root_router = APIRouter(prefix="/ui")

root_router.include_router(dataset_router)


app.include_router(root_router)
app = cached_app()
65 changes: 63 additions & 2 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,45 @@ def string_lower_type(val):
help="The access log format for gunicorn logs",
)


# ui-api
ARG_UI_API_PORT = Arg(
("-p", "--port"),
default=9091,
type=int,
help="The port on which to run the server",
)
ARG_UI_API_WORKERS = Arg(
("-w", "--workers"),
default=4,
type=int,
help="Number of workers to run the UI API-on",
)
ARG_UI_API_WORKER_TIMEOUT = Arg(
("-t", "--worker-timeout"),
default=120,
type=int,
help="The timeout for waiting on UI API workers",
)
ARG_UI_API_HOSTNAME = Arg(
("-H", "--hostname"),
default="0.0.0.0", # nosec
help="Set the hostname on which to run the web server",
)
ARG_UI_API_ACCESS_LOGFILE = Arg(
("-A", "--access-logfile"),
help="The logfile to store the access log. Use '-' to print to stdout",
)
ARG_UI_API_ERROR_LOGFILE = Arg(
("-E", "--error-logfile"),
help="The logfile to store the error log. Use '-' to print to stderr",
)
ARG_UI_API_ACCESS_LOGFORMAT = Arg(
("-L", "--access-logformat"),
help="The access log format for gunicorn logs",
)


# scheduler
ARG_NUM_RUNS = Arg(
("-n", "--num-runs"),
Expand Down Expand Up @@ -1923,7 +1962,7 @@ class GroupCommand(NamedTuple):
),
ActionCommand(
name="webserver",
help="Start a Airflow webserver instance",
help="Start an Airflow webserver instance",
func=lazy_load_command("airflow.cli.commands.webserver_command.webserver"),
args=(
ARG_PORT,
Expand All @@ -1944,6 +1983,28 @@ class GroupCommand(NamedTuple):
ARG_DEBUG,
),
),
ActionCommand(
name="ui-api",
help="Start an Airflow UI API instance",
func=lazy_load_command("airflow.cli.commands.ui_api_command.ui_api"),
args=(
ARG_UI_API_PORT,
ARG_UI_API_WORKERS,
ARG_UI_API_WORKER_TIMEOUT,
ARG_UI_API_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_UI_API_ACCESS_LOGFILE,
ARG_UI_API_ERROR_LOGFILE,
ARG_UI_API_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
),
ActionCommand(
name="scheduler",
help="Start a scheduler instance",
Expand Down Expand Up @@ -2063,7 +2124,7 @@ class GroupCommand(NamedTuple):
core_commands.append(
ActionCommand(
name="internal-api",
help="Start a Airflow Internal API instance",
help="Start an Airflow Internal API instance",
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
args=(
ARG_INTERNAL_API_PORT,
Expand Down
7 changes: 7 additions & 0 deletions airflow/cli/commands/standalone_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ def run(self):
command=["webserver"],
env=env,
)
self.subcommands["ui-api"] = SubCommand(
self,
name="ui-api",
command=["ui-api"],
env=env,
)
self.subcommands["triggerer"] = SubCommand(
self,
name="triggerer",
Expand Down Expand Up @@ -136,6 +142,7 @@ def print_output(self, name: str, output):
You can pass multiple lines to output if you wish; it will be split for you.
"""
color = {
"ui-api": "magenta",
"webserver": "green",
"scheduler": "blue",
"triggerer": "cyan",
Expand Down
Loading

0 comments on commit c45be77

Please sign in to comment.