From c45be77a7db602cb401217079e67291689f25c15 Mon Sep 17 00:00:00 2001 From: Pierre Jeambrun Date: Mon, 2 Sep 2024 12:39:21 +0200 Subject: [PATCH] add airflow ui-api command (#41896) add port binding Add tests Add ui-api to airflow standalone Fix bad descriptor error on shutting down the worker --- .pre-commit-config.yaml | 3 +- airflow/api_ui/app.py | 31 ++- airflow/api_ui/gunicorn_config.py | 33 +++ airflow/api_ui/main.py | 14 +- airflow/cli/cli_config.py | 65 +++++- airflow/cli/commands/standalone_command.py | 7 + airflow/cli/commands/ui_api_command.py | 201 ++++++++++++++++++ .../03_contributors_quick_start.rst | 2 + dev/breeze/doc/03_developer_tasks.rst | 3 + .../src/airflow_breeze/global_constants.py | 1 + .../src/airflow_breeze/params/shell_params.py | 2 + .../src/airflow_breeze/utils/visuals.py | 3 + docs/docker-stack/entrypoint.rst | 2 +- scripts/ci/docker-compose/base-ports.yml | 1 + scripts/in_container/bin/run_tmux | 11 +- tests/cli/commands/_common_cli_classes.py | 2 +- .../cli/commands/test_internal_api_command.py | 4 +- tests/cli/commands/test_ui_api_command.py | 166 +++++++++++++++ tests/cli/commands/test_webserver_command.py | 4 +- 19 files changed, 532 insertions(+), 23 deletions(-) create mode 100644 airflow/api_ui/gunicorn_config.py create mode 100644 airflow/cli/commands/ui_api_command.py create mode 100644 tests/cli/commands/test_ui_api_command.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index fb5ef7421ac35..44aa21db5284c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -581,8 +581,9 @@ repos: (?x) ^airflow/api_connexion/openapi/v1.yaml$| ^airflow/ui/openapi-gen/| - ^airflow/cli/commands/webserver_command.py$| ^airflow/cli/commands/internal_api_command.py$| + ^airflow/cli/commands/ui_api_command.py$| + ^airflow/cli/commands/webserver_command.py$| ^airflow/config_templates/| ^airflow/models/baseoperator.py$| ^airflow/operators/__init__.py$| diff --git a/airflow/api_ui/app.py b/airflow/api_ui/app.py index 2307e0eea32b1..b279f6c90d25e 100644 --- a/airflow/api_ui/app.py +++ b/airflow/api_ui/app.py @@ -16,10 +16,12 @@ # under the License. from __future__ import annotations -from fastapi import FastAPI +from fastapi import APIRouter, FastAPI from airflow.www.extensions.init_dagbag import get_dag_bag +app: FastAPI | None = None + def init_dag_bag(app: FastAPI) -> None: """ @@ -39,4 +41,31 @@ def create_app() -> FastAPI: init_dag_bag(app) + init_views(app) + + return app + + +def init_views(app) -> None: + """Init views by registering the different routers.""" + from airflow.api_ui.views.datasets import dataset_router + + root_router = APIRouter(prefix="/ui") + + root_router.include_router(dataset_router) + + app.include_router(root_router) + + +def cached_app(config=None, testing=False): + """Return cached instance of Airflow UI app.""" + global app + if not app: + app = create_app() return app + + +def purge_cached_app(): + """Remove the cached version of the app in global state.""" + global app + app = None diff --git a/airflow/api_ui/gunicorn_config.py b/airflow/api_ui/gunicorn_config.py new file mode 100644 index 0000000000000..3ee3ba5e7cc2b --- /dev/null +++ b/airflow/api_ui/gunicorn_config.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import setproctitle + +from airflow import settings + + +def post_worker_init(_): + """ + Set process title. + + This is used by airflow.cli.commands.ui_api_command to track the status of the worker. + """ + old_title = setproctitle.getproctitle() + setproctitle.setproctitle(settings.GUNICORN_WORKER_READY_PREFIX + old_title) diff --git a/airflow/api_ui/main.py b/airflow/api_ui/main.py index 05798974a8458..175db6b9271d1 100644 --- a/airflow/api_ui/main.py +++ b/airflow/api_ui/main.py @@ -17,16 +17,6 @@ from __future__ import annotations -from fastapi import APIRouter +from airflow.api_ui.app import cached_app -from airflow.api_ui.app import create_app -from airflow.api_ui.views.datasets import dataset_router - -app = create_app() - -root_router = APIRouter(prefix="/ui") - -root_router.include_router(dataset_router) - - -app.include_router(root_router) +app = cached_app() diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 508a18263d48e..a90fda69ff476 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -782,6 +782,45 @@ def string_lower_type(val): help="The access log format for gunicorn logs", ) + +# ui-api +ARG_UI_API_PORT = Arg( + ("-p", "--port"), + default=9091, + type=int, + help="The port on which to run the server", +) +ARG_UI_API_WORKERS = Arg( + ("-w", "--workers"), + default=4, + type=int, + help="Number of workers to run the UI API-on", +) +ARG_UI_API_WORKER_TIMEOUT = Arg( + ("-t", "--worker-timeout"), + default=120, + type=int, + help="The timeout for waiting on UI API workers", +) +ARG_UI_API_HOSTNAME = Arg( + ("-H", "--hostname"), + default="0.0.0.0", # nosec + help="Set the hostname on which to run the web server", +) +ARG_UI_API_ACCESS_LOGFILE = Arg( + ("-A", "--access-logfile"), + help="The logfile to store the access log. Use '-' to print to stdout", +) +ARG_UI_API_ERROR_LOGFILE = Arg( + ("-E", "--error-logfile"), + help="The logfile to store the error log. Use '-' to print to stderr", +) +ARG_UI_API_ACCESS_LOGFORMAT = Arg( + ("-L", "--access-logformat"), + help="The access log format for gunicorn logs", +) + + # scheduler ARG_NUM_RUNS = Arg( ("-n", "--num-runs"), @@ -1923,7 +1962,7 @@ class GroupCommand(NamedTuple): ), ActionCommand( name="webserver", - help="Start a Airflow webserver instance", + help="Start an Airflow webserver instance", func=lazy_load_command("airflow.cli.commands.webserver_command.webserver"), args=( ARG_PORT, @@ -1944,6 +1983,28 @@ class GroupCommand(NamedTuple): ARG_DEBUG, ), ), + ActionCommand( + name="ui-api", + help="Start an Airflow UI API instance", + func=lazy_load_command("airflow.cli.commands.ui_api_command.ui_api"), + args=( + ARG_UI_API_PORT, + ARG_UI_API_WORKERS, + ARG_UI_API_WORKER_TIMEOUT, + ARG_UI_API_HOSTNAME, + ARG_PID, + ARG_DAEMON, + ARG_STDOUT, + ARG_STDERR, + ARG_UI_API_ACCESS_LOGFILE, + ARG_UI_API_ERROR_LOGFILE, + ARG_UI_API_ACCESS_LOGFORMAT, + ARG_LOG_FILE, + ARG_SSL_CERT, + ARG_SSL_KEY, + ARG_DEBUG, + ), + ), ActionCommand( name="scheduler", help="Start a scheduler instance", @@ -2063,7 +2124,7 @@ class GroupCommand(NamedTuple): core_commands.append( ActionCommand( name="internal-api", - help="Start a Airflow Internal API instance", + help="Start an Airflow Internal API instance", func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"), args=( ARG_INTERNAL_API_PORT, diff --git a/airflow/cli/commands/standalone_command.py b/airflow/cli/commands/standalone_command.py index 11b8a852ba004..1f2bf5c9e9b2c 100644 --- a/airflow/cli/commands/standalone_command.py +++ b/airflow/cli/commands/standalone_command.py @@ -80,6 +80,12 @@ def run(self): command=["webserver"], env=env, ) + self.subcommands["ui-api"] = SubCommand( + self, + name="ui-api", + command=["ui-api"], + env=env, + ) self.subcommands["triggerer"] = SubCommand( self, name="triggerer", @@ -136,6 +142,7 @@ def print_output(self, name: str, output): You can pass multiple lines to output if you wish; it will be split for you. """ color = { + "ui-api": "magenta", "webserver": "green", "scheduler": "blue", "triggerer": "cyan", diff --git a/airflow/cli/commands/ui_api_command.py b/airflow/cli/commands/ui_api_command.py new file mode 100644 index 0000000000000..cacc1fd7d487c --- /dev/null +++ b/airflow/cli/commands/ui_api_command.py @@ -0,0 +1,201 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""UI API command.""" + +from __future__ import annotations + +import logging +import os +import signal +import subprocess +import sys +import textwrap +from contextlib import suppress +from pathlib import Path +from time import sleep +from typing import NoReturn + +import psutil +from lockfile.pidlockfile import read_pid_from_pidfile +from uvicorn.workers import UvicornWorker + +from airflow import settings +from airflow.cli.commands.daemon_utils import run_command_with_daemon_option +from airflow.cli.commands.webserver_command import GunicornMonitor +from airflow.utils import cli as cli_utils +from airflow.utils.cli import setup_locations +from airflow.utils.providers_configuration_loader import providers_configuration_loaded + +log = logging.getLogger(__name__) + + +# This shouldn't be necessary but there seems to be an issue in uvloop that causes bad file descriptor +# errors when shutting down workers. Despite the 'closed' status of the issue it is not solved, +# more info here: https://github.com/benoitc/gunicorn/issues/1877#issuecomment-1911136399 +AirflowUvicornWorker = UvicornWorker +AirflowUvicornWorker.CONFIG_KWARGS = {"loop": "asyncio", "http": "auto"} + + +@cli_utils.action_cli +@providers_configuration_loaded +def ui_api(args): + """Start Airflow UI API.""" + print(settings.HEADER) + + access_logfile = args.access_logfile or "-" + error_logfile = args.error_logfile or "-" + access_logformat = args.access_logformat + num_workers = args.workers + worker_timeout = args.worker_timeout + + worker_class = "airflow.cli.commands.ui_api_command.AirflowUvicornWorker" + + from airflow.api_ui.app import create_app + + if args.debug: + print(f"Starting the UI API server on port {args.port} and host {args.hostname} debug.") + log.warning("Running in dev mode, ignoring gunicorn args") + + run_args = [ + "fastapi", + "dev", + "airflow/api_ui/main.py", + "--port", + str(args.port), + "--host", + str(args.hostname), + ] + + with subprocess.Popen( + run_args, + close_fds=True, + ) as process: + process.wait() + else: + log.info( + textwrap.dedent( + f"""\ + Running the Gunicorn Server with: + Workers: {num_workers} {worker_class} + Host: {args.hostname}:{args.port} + Timeout: {worker_timeout} + Logfiles: {access_logfile} {error_logfile} + Access Logformat: {access_logformat} + =================================================================""" + ) + ) + + pid_file, _, _, _ = setup_locations("ui-api", pid=args.pid) + run_args = [ + sys.executable, + "-m", + "gunicorn", + "--workers", + str(num_workers), + "--worker-class", + str(worker_class), + "--timeout", + str(worker_timeout), + "--bind", + args.hostname + ":" + str(args.port), + "--name", + "airflow-ui-api", + "--pid", + pid_file, + "--access-logfile", + str(access_logfile), + "--error-logfile", + str(error_logfile), + "--config", + "python:airflow.api_ui.gunicorn_config", + ] + + if args.access_logformat and args.access_logformat.strip(): + run_args += ["--access-logformat", str(args.access_logformat)] + + if args.daemon: + run_args += ["--daemon"] + + run_args += ["airflow.api_ui.app:cached_app()"] + + # To prevent different workers creating the web app and + # all writing to the database at the same time, we use the --preload option. + # With the preload option, the app is loaded before the workers are forked, and each worker will + # then have a copy of the app + run_args += ["--preload"] + + def kill_proc(signum: int, gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn: + log.info("Received signal: %s. Closing gunicorn.", signum) + gunicorn_master_proc.terminate() + with suppress(TimeoutError): + gunicorn_master_proc.wait(timeout=30) + if isinstance(gunicorn_master_proc, subprocess.Popen): + still_running = gunicorn_master_proc.poll() is not None + else: + still_running = gunicorn_master_proc.is_running() + if still_running: + gunicorn_master_proc.kill() + sys.exit(0) + + def monitor_gunicorn(gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn: + # Register signal handlers + signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum, gunicorn_master_proc)) + signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum, gunicorn_master_proc)) + + # These run forever until SIG{INT, TERM, KILL, ...} signal is sent + GunicornMonitor( + gunicorn_master_pid=gunicorn_master_proc.pid, + num_workers_expected=num_workers, + master_timeout=120, + worker_refresh_interval=30, + worker_refresh_batch_size=1, + reload_on_plugin_change=False, + ).start() + + def start_and_monitor_gunicorn(args): + if args.daemon: + subprocess.Popen(run_args, close_fds=True) + + # Reading pid of gunicorn master as it will be different that + # the one of process spawned above. + gunicorn_master_proc_pid = None + while not gunicorn_master_proc_pid: + sleep(0.1) + gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file) + + # Run Gunicorn monitor + gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) + monitor_gunicorn(gunicorn_master_proc) + else: + with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc: + monitor_gunicorn(gunicorn_master_proc) + + if args.daemon: + # This makes possible errors get reported before daemonization + os.environ["SKIP_DAGS_PARSING"] = "True" + create_app() + os.environ.pop("SKIP_DAGS_PARSING") + + pid_file_path = Path(pid_file) + monitor_pid_file = str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}")) + run_command_with_daemon_option( + args=args, + process_name="ui-api", + callback=lambda: start_and_monitor_gunicorn(args), + should_setup_logging=True, + pid_file=monitor_pid_file, + ) diff --git a/contributing-docs/03_contributors_quick_start.rst b/contributing-docs/03_contributors_quick_start.rst index eb84bb668a78b..b7467d6c4a16b 100644 --- a/contributing-docs/03_contributors_quick_start.rst +++ b/contributing-docs/03_contributors_quick_start.rst @@ -335,6 +335,7 @@ Using Breeze Ports are forwarded to the running docker containers for webserver and database * 12322 -> forwarded to Airflow ssh server -> airflow:22 * 28080 -> forwarded to Airflow webserver -> airflow:8080 + * 29091 -> forwarded to Airflow UI API -> airflow:9091 * 25555 -> forwarded to Flower dashboard -> airflow:5555 * 25433 -> forwarded to Postgres database -> postgres:5432 * 23306 -> forwarded to MySQL database -> mysql:3306 @@ -343,6 +344,7 @@ Using Breeze Here are links to those services that you can use on host: * ssh connection for remote debugging: ssh -p 12322 airflow@127.0.0.1 (password: airflow) * Webserver: http://127.0.0.1:28080 + * UI API: http://127.0.0.1:29091 * Flower: http://127.0.0.1:25555 * Postgres: jdbc:postgresql://127.0.0.1:25433/airflow?user=postgres&password=airflow * Mysql: jdbc:mysql://127.0.0.1:23306/airflow?user=root diff --git a/dev/breeze/doc/03_developer_tasks.rst b/dev/breeze/doc/03_developer_tasks.rst index b23bf0a7a8baf..4acfdb4627849 100644 --- a/dev/breeze/doc/03_developer_tasks.rst +++ b/dev/breeze/doc/03_developer_tasks.rst @@ -113,6 +113,7 @@ When you run Airflow Breeze, the following ports are automatically forwarded: * 12322 -> forwarded to Airflow ssh server -> airflow:22 * 28080 -> forwarded to Airflow webserver -> airflow:8080 + * 29091 -> forwarded to Airflow UI API -> airflow:9091 * 25555 -> forwarded to Flower dashboard -> airflow:5555 * 25433 -> forwarded to Postgres database -> postgres:5432 * 23306 -> forwarded to MySQL database -> mysql:3306 @@ -125,6 +126,7 @@ You can connect to these ports/databases using: * ssh connection for remote debugging: ssh -p 12322 airflow@127.0.0.1 pw: airflow * Webserver: http://127.0.0.1:28080 + * UI API: http://127.0.0.1:29091 * Flower: http://127.0.0.1:25555 * Postgres: jdbc:postgresql://127.0.0.1:25433/airflow?user=postgres&password=airflow * Mysql: jdbc:mysql://127.0.0.1:23306/airflow?user=root @@ -154,6 +156,7 @@ You can change the used host port numbers by setting appropriate environment var * ``SSH_PORT`` * ``WEBSERVER_HOST_PORT`` +* ``UI_API_HOST_PORT`` * ``POSTGRES_HOST_PORT`` * ``MYSQL_HOST_PORT`` * ``MSSQL_HOST_PORT`` diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py index fadd2b154caf2..6f6545bf8f67d 100644 --- a/dev/breeze/src/airflow_breeze/global_constants.py +++ b/dev/breeze/src/airflow_breeze/global_constants.py @@ -253,6 +253,7 @@ def get_default_platform_machine() -> str: SSH_PORT = "12322" WEBSERVER_HOST_PORT = "28080" VITE_DEV_PORT = "5173" +UI_API_HOST_PORT = "29091" CELERY_BROKER_URLS_MAP = {"rabbitmq": "amqp://guest:guest@rabbitmq:5672", "redis": "redis://redis:6379/0"} SQLITE_URL = "sqlite:////root/airflow/sqlite/airflow.db" diff --git a/dev/breeze/src/airflow_breeze/params/shell_params.py b/dev/breeze/src/airflow_breeze/params/shell_params.py index 11f12d4a1fb09..150bad0d0b6ce 100644 --- a/dev/breeze/src/airflow_breeze/params/shell_params.py +++ b/dev/breeze/src/airflow_breeze/params/shell_params.py @@ -52,6 +52,7 @@ SSH_PORT, START_AIRFLOW_DEFAULT_ALLOWED_EXECUTOR, TESTABLE_INTEGRATIONS, + UI_API_HOST_PORT, USE_AIRFLOW_MOUNT_SOURCES, WEBSERVER_HOST_PORT, GithubEvents, @@ -572,6 +573,7 @@ def env_variables_for_docker_commands(self) -> dict[str, str]: _set_var(_env, "VERBOSE_COMMANDS", self.verbose_commands) _set_var(_env, "VERSION_SUFFIX_FOR_PYPI", self.version_suffix_for_pypi) _set_var(_env, "WEBSERVER_HOST_PORT", None, WEBSERVER_HOST_PORT) + _set_var(_env, "UI_API_HOST_PORT", None, UI_API_HOST_PORT) _set_var(_env, "_AIRFLOW_RUN_DB_TESTS_ONLY", self.run_db_tests_only) _set_var(_env, "_AIRFLOW_SKIP_DB_TESTS", self.skip_db_tests) self._generate_env_for_docker_compose_file_if_needed(_env) diff --git a/dev/breeze/src/airflow_breeze/utils/visuals.py b/dev/breeze/src/airflow_breeze/utils/visuals.py index dfd0f4bdb5acf..a43182713f1d1 100644 --- a/dev/breeze/src/airflow_breeze/utils/visuals.py +++ b/dev/breeze/src/airflow_breeze/utils/visuals.py @@ -26,6 +26,7 @@ POSTGRES_HOST_PORT, REDIS_HOST_PORT, SSH_PORT, + UI_API_HOST_PORT, WEBSERVER_HOST_PORT, ) from airflow_breeze.utils.path_utils import AIRFLOW_SOURCES_ROOT @@ -82,6 +83,7 @@ Ports are forwarded to the running docker containers for webserver and database * {SSH_PORT} -> forwarded to Airflow ssh server -> airflow:22 * {WEBSERVER_HOST_PORT} -> forwarded to Airflow webserver -> airflow:8080 + * {UI_API_HOST_PORT} -> forwarded to Airflow UI API -> airflow:9091 * {FLOWER_HOST_PORT} -> forwarded to Flower dashboard -> airflow:5555 * {POSTGRES_HOST_PORT} -> forwarded to Postgres database -> postgres:5432 * {MYSQL_HOST_PORT} -> forwarded to MySQL database -> mysql:3306 @@ -91,6 +93,7 @@ * ssh connection for remote debugging: ssh -p {SSH_PORT} airflow@127.0.0.1 (password: airflow) * Webserver: http://127.0.0.1:{WEBSERVER_HOST_PORT} + * UI API: http://127.0.0.1:{WEBSERVER_HOST_PORT} * Flower: http://127.0.0.1:{FLOWER_HOST_PORT} * Postgres: jdbc:postgresql://127.0.0.1:{POSTGRES_HOST_PORT}/airflow?user=postgres&password=airflow * Mysql: jdbc:mysql://127.0.0.1:{MYSQL_HOST_PORT}/airflow?user=root diff --git a/docs/docker-stack/entrypoint.rst b/docs/docker-stack/entrypoint.rst index 2933353f0c993..65a2b92b573e4 100644 --- a/docs/docker-stack/entrypoint.rst +++ b/docs/docker-stack/entrypoint.rst @@ -189,7 +189,7 @@ If there are any other arguments - they are simply passed to the "airflow" comma scheduler Start a scheduler instance sync-perm Update permissions for existing roles and optionally DAGs version Show the version - webserver Start a Airflow webserver instance + webserver Start an Airflow webserver instance optional arguments: -h, --help show this help message and exit diff --git a/scripts/ci/docker-compose/base-ports.yml b/scripts/ci/docker-compose/base-ports.yml index c68e3b24849da..050aaad2dce4f 100644 --- a/scripts/ci/docker-compose/base-ports.yml +++ b/scripts/ci/docker-compose/base-ports.yml @@ -20,4 +20,5 @@ services: ports: - "${SSH_PORT}:22" - "${WEBSERVER_HOST_PORT}:8080" + - "${UI_API_HOST_PORT}:9091" - "${FLOWER_HOST_PORT}:5555" diff --git a/scripts/in_container/bin/run_tmux b/scripts/in_container/bin/run_tmux index 40fc695a643b0..fce52fc2d21da 100755 --- a/scripts/in_container/bin/run_tmux +++ b/scripts/in_container/bin/run_tmux @@ -56,8 +56,17 @@ tmux split-window -v tmux select-pane -t 1 tmux send-keys 'airflow scheduler' C-m -tmux split-window -h + tmux select-pane -t 2 +tmux split-window -h +if [[ ${DEV_MODE=} == "true" ]]; then + tmux send-keys 'airflow ui-api -d' C-m +else + tmux send-keys 'airflow ui-api' C-m +fi + +tmux split-window -h +tmux select-pane -t 3 if [[ ${DEV_MODE=} == "true" ]]; then tmux send-keys 'airflow webserver -d' C-m else diff --git a/tests/cli/commands/_common_cli_classes.py b/tests/cli/commands/_common_cli_classes.py index a5f78e9cfcb25..77dfd41707fd6 100644 --- a/tests/cli/commands/_common_cli_classes.py +++ b/tests/cli/commands/_common_cli_classes.py @@ -33,7 +33,7 @@ console = Console(width=400, color_system="standard") -class _ComonCLIGunicornTestClass: +class _CommonCLIGunicornTestClass: main_process_regexp: str = "process_to_look_for" @pytest.fixture(autouse=True) diff --git a/tests/cli/commands/test_internal_api_command.py b/tests/cli/commands/test_internal_api_command.py index 99992e6266861..2959cfa9f6f59 100644 --- a/tests/cli/commands/test_internal_api_command.py +++ b/tests/cli/commands/test_internal_api_command.py @@ -31,7 +31,7 @@ from airflow.cli.commands import internal_api_command from airflow.cli.commands.internal_api_command import GunicornMonitor from airflow.settings import _ENABLE_AIP_44 -from tests.cli.commands._common_cli_classes import _ComonCLIGunicornTestClass +from tests.cli.commands._common_cli_classes import _CommonCLIGunicornTestClass from tests.test_utils.config import conf_vars console = Console(width=400, color_system="standard") @@ -85,7 +85,7 @@ def test_ready_prefix_on_cmdline_dead_process(self): @pytest.mark.db_test @pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled") -class TestCliInternalAPI(_ComonCLIGunicornTestClass): +class TestCliInternalAPI(_CommonCLIGunicornTestClass): main_process_regexp = r"airflow internal-api" @pytest.mark.execution_timeout(210) diff --git a/tests/cli/commands/test_ui_api_command.py b/tests/cli/commands/test_ui_api_command.py new file mode 100644 index 0000000000000..81ee2275bf77b --- /dev/null +++ b/tests/cli/commands/test_ui_api_command.py @@ -0,0 +1,166 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +import subprocess +import sys +import time +from unittest import mock + +import psutil +import pytest +from rich.console import Console + +from airflow.cli.commands import ui_api_command +from tests.cli.commands._common_cli_classes import _CommonCLIGunicornTestClass + +console = Console(width=400, color_system="standard") + + +@pytest.mark.db_test +class TestCliInternalAPI(_CommonCLIGunicornTestClass): + main_process_regexp = r"airflow ui-api" + + @pytest.mark.execution_timeout(210) + def test_cli_ui_api_background(self, tmp_path): + parent_path = tmp_path / "gunicorn" + parent_path.mkdir() + pidfile_ui_api = parent_path / "pidflow-ui-api.pid" + pidfile_monitor = parent_path / "pidflow-ui-api-monitor.pid" + stdout = parent_path / "airflow-ui-api.out" + stderr = parent_path / "airflow-ui-api.err" + logfile = parent_path / "airflow-ui-api.log" + try: + # Run internal-api as daemon in background. Note that the wait method is not called. + console.print("[magenta]Starting airflow ui-api --daemon") + env = os.environ.copy() + proc = subprocess.Popen( + [ + "airflow", + "ui-api", + "--daemon", + "--pid", + os.fspath(pidfile_ui_api), + "--stdout", + os.fspath(stdout), + "--stderr", + os.fspath(stderr), + "--log-file", + os.fspath(logfile), + ], + env=env, + ) + assert proc.poll() is None + + pid_monitor = self._wait_pidfile(pidfile_monitor) + console.print(f"[blue]Monitor started at {pid_monitor}") + pid_ui_api = self._wait_pidfile(pidfile_ui_api) + console.print(f"[blue]UI API started at {pid_ui_api}") + console.print("[blue]Running airflow ui-api process:") + # Assert that the ui-api and gunicorn processes are running (by name rather than pid). + assert self._find_process(r"airflow ui-api --daemon", print_found_process=True) + console.print("[blue]Waiting for gunicorn processes:") + # wait for gunicorn to start + for _ in range(30): + if self._find_process(r"^gunicorn"): + break + console.print("[blue]Waiting for gunicorn to start ...") + time.sleep(1) + console.print("[blue]Running gunicorn processes:") + assert self._find_all_processes("^gunicorn", print_found_process=True) + console.print("[magenta]ui-api process started successfully.") + console.print( + "[magenta]Terminating monitor process and expect " + "ui-api and gunicorn processes to terminate as well" + ) + proc = psutil.Process(pid_monitor) + proc.terminate() + assert proc.wait(120) in (0, None) + self._check_processes(ignore_running=False) + console.print("[magenta]All ui-api and gunicorn processes are terminated.") + except Exception: + console.print("[red]Exception occurred. Dumping all logs.") + # Dump all logs + for file in parent_path.glob("*"): + console.print(f"Dumping {file} (size: {file.stat().st_size})") + console.print(file.read_text()) + raise + + def test_cli_ui_api_debug(self, app): + with mock.patch("subprocess.Popen") as Popen, mock.patch.object(ui_api_command, "GunicornMonitor"): + port = "9092" + hostname = "somehost" + args = self.parser.parse_args(["ui-api", "--port", port, "--hostname", hostname, "--debug"]) + ui_api_command.ui_api(args) + + Popen.assert_called_with( + [ + "fastapi", + "dev", + "airflow/api_ui/main.py", + "--port", + port, + "--host", + hostname, + ], + close_fds=True, + ) + + def test_cli_ui_api_args(self): + with mock.patch("subprocess.Popen") as Popen, mock.patch.object(ui_api_command, "GunicornMonitor"): + args = self.parser.parse_args( + [ + "ui-api", + "--access-logformat", + "custom_log_format", + "--pid", + "/tmp/x.pid", + ] + ) + ui_api_command.ui_api(args) + + Popen.assert_called_with( + [ + sys.executable, + "-m", + "gunicorn", + "--workers", + "4", + "--worker-class", + "airflow.cli.commands.ui_api_command.AirflowUvicornWorker", + "--timeout", + "120", + "--bind", + "0.0.0.0:9091", + "--name", + "airflow-ui-api", + "--pid", + "/tmp/x.pid", + "--access-logfile", + "-", + "--error-logfile", + "-", + "--config", + "python:airflow.api_ui.gunicorn_config", + "--access-logformat", + "custom_log_format", + "airflow.api_ui.app:cached_app()", + "--preload", + ], + close_fds=True, + ) diff --git a/tests/cli/commands/test_webserver_command.py b/tests/cli/commands/test_webserver_command.py index 07d95a9e5f75a..5531f674689b9 100644 --- a/tests/cli/commands/test_webserver_command.py +++ b/tests/cli/commands/test_webserver_command.py @@ -30,7 +30,7 @@ from airflow.cli import cli_parser from airflow.cli.commands import webserver_command from airflow.cli.commands.webserver_command import GunicornMonitor -from tests.cli.commands._common_cli_classes import _ComonCLIGunicornTestClass +from tests.cli.commands._common_cli_classes import _CommonCLIGunicornTestClass from tests.test_utils.config import conf_vars console = Console(width=400, color_system="standard") @@ -227,7 +227,7 @@ def test_ready_prefix_on_cmdline_dead_process(self): @pytest.mark.db_test -class TestCliWebServer(_ComonCLIGunicornTestClass): +class TestCliWebServer(_CommonCLIGunicornTestClass): main_process_regexp = r"airflow webserver" @pytest.mark.execution_timeout(400)