From 0094e6a07c8a8e8c710832daaf77f9b346d39ba0 Mon Sep 17 00:00:00 2001 From: czgu Date: Wed, 11 Oct 2023 14:15:22 -0700 Subject: [PATCH] Use gevent for celery worker, update timeout to GeventTimeout --- containers/docker-compose.prod.yml | 2 +- docker-compose.yml | 2 +- querybook/scripts/run_test | 2 +- querybook/server/app/flask_app.py | 1 - .../server/datasources/query_execution.py | 1 - .../connection_checker.py | 4 ++-- .../select_one_checker.py | 4 ++-- .../server/lib/query_executor/clients/hive.py | 4 ++-- querybook/server/lib/utils/utils.py | 21 +++++++++++++++++++ querybook/server/runweb.py | 4 ---- querybook/server/tasks/run_query.py | 1 - .../tests/test_lib/test_utils/test_utils.py | 11 ++++++++++ 12 files changed, 41 insertions(+), 16 deletions(-) create mode 100644 querybook/tests/test_lib/test_utils/test_utils.py diff --git a/containers/docker-compose.prod.yml b/containers/docker-compose.prod.yml index ec93107d9..5ced2c9de 100644 --- a/containers/docker-compose.prod.yml +++ b/containers/docker-compose.prod.yml @@ -23,7 +23,7 @@ services: worker: extends: service: base - command: ./querybook/scripts/runservice prod_worker -c 150 -l info -Ofair -n celery@%h + command: ./querybook/scripts/runservice prod_worker -P gevent -c 150 -l info -Ofair -n celery@%h scheduler: extends: service: base diff --git a/docker-compose.yml b/docker-compose.yml index 1e36f3d86..98759fd8c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,7 +43,7 @@ services: image: *querybook-image tty: true stdin_open: true - command: './querybook/scripts/runservice worker -c 5' + command: './querybook/scripts/runservice worker -P gevent -c 5' volumes: *querybook-volumes depends_on: *querybook-depends-on scheduler: diff --git a/querybook/scripts/run_test b/querybook/scripts/run_test index baefbdc46..b3e90af97 100755 --- a/querybook/scripts/run_test +++ b/querybook/scripts/run_test @@ -17,7 +17,7 @@ usage() { run_python_unit_test() { echo 'Start running python unit tests >>>>>>>>>>>>>>>>>>>>>>>>>>>>' - py.test querybook/tests || exit 1 + python -m gevent.monkey --module pytest querybook/tests || exit 1 } run_webpack_test() { diff --git a/querybook/server/app/flask_app.py b/querybook/server/app/flask_app.py index f64bdb7c8..3c0b75348 100644 --- a/querybook/server/app/flask_app.py +++ b/querybook/server/app/flask_app.py @@ -66,7 +66,6 @@ def make_celery(app): worker_max_tasks_per_child=1, task_track_started=True, task_soft_time_limit=172800, - worker_proc_alive_timeout=60, broker_transport_options={ # This must be higher than soft time limit, # otherwise the task will get retried (in the case of acks_late=True) diff --git a/querybook/server/datasources/query_execution.py b/querybook/server/datasources/query_execution.py index 83ddeb180..8c3515cdb 100644 --- a/querybook/server/datasources/query_execution.py +++ b/querybook/server/datasources/query_execution.py @@ -180,7 +180,6 @@ def cancel_query_and_notify(): "RECEIVED", # Rare case where task is received but not yet start "RETRY", # Very unlikely case, because query normally do not retry ): - task.revoke(terminate=True) # last attempt to cancel it cancel_query_and_notify() elif task.state == "ABORTED": diff --git a/querybook/server/lib/engine_status_checker/connection_checker.py b/querybook/server/lib/engine_status_checker/connection_checker.py index 52a12ff8c..1a18fde43 100644 --- a/querybook/server/lib/engine_status_checker/connection_checker.py +++ b/querybook/server/lib/engine_status_checker/connection_checker.py @@ -4,7 +4,7 @@ from .base_checker import BaseEngineStatusChecker, EngineStatus from const.query_execution import QueryEngineStatus from lib.query_executor.base_executor import QueryExecutorBaseClass -from lib.utils.utils import Timeout +from lib.utils.utils import GeventTimeout class ConnectionChecker(BaseEngineStatusChecker): @@ -24,7 +24,7 @@ def check_connection( ) -> EngineStatus: result: EngineStatus = {"status": QueryEngineStatus.GOOD.value, "messages": []} try: - with Timeout(20, "Connection took too long"): + with GeventTimeout(20, "Connection took too long"): cursor = executor._get_client(client_settings).cursor() utc_now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") result["messages"].append( diff --git a/querybook/server/lib/engine_status_checker/select_one_checker.py b/querybook/server/lib/engine_status_checker/select_one_checker.py index c41986937..88a096868 100644 --- a/querybook/server/lib/engine_status_checker/select_one_checker.py +++ b/querybook/server/lib/engine_status_checker/select_one_checker.py @@ -5,7 +5,7 @@ from const.query_execution import QueryEngineStatus from lib.query_executor.base_executor import QueryExecutorBaseClass from lib.query_executor.base_client import CursorBaseClass -from lib.utils.utils import Timeout, TimeoutError +from lib.utils.utils import GeventTimeout, TimeoutError class SelectOneChecker(BaseEngineStatusChecker): @@ -29,7 +29,7 @@ def check_select_one( ) -> EngineStatus: result: EngineStatus = {"status": QueryEngineStatus.GOOD.value, "messages": []} try: - with Timeout(20, "Select 1 took too long"): + with GeventTimeout(20, "Select 1 took too long"): cursor: CursorBaseClass = executor._get_client(client_settings).cursor() cursor.run("select 1") cursor.poll_until_finish() diff --git a/querybook/server/lib/query_executor/clients/hive.py b/querybook/server/lib/query_executor/clients/hive.py index 6d4fd9714..b390a64ea 100644 --- a/querybook/server/lib/query_executor/clients/hive.py +++ b/querybook/server/lib/query_executor/clients/hive.py @@ -3,7 +3,7 @@ from pyhive import hive from TCLIService.ttypes import TOperationState -from lib.utils.utils import Timeout +from lib.utils.utils import GeventTimeout from lib.query_executor.base_client import ClientBaseClass, CursorBaseClass from lib.query_executor.connection_string.hive import get_hive_connection_conf @@ -21,7 +21,7 @@ def __init__( *args, **kwargs ): - with Timeout(120, "Timeout connecting to HiveServer"): + with GeventTimeout(120, "Timeout connecting to HiveServer"): connection_conf = get_hive_connection_conf(connection_string) port = 10000 if not connection_conf.port else connection_conf.port diff --git a/querybook/server/lib/utils/utils.py b/querybook/server/lib/utils/utils.py index e38a4549b..bc31ba08c 100644 --- a/querybook/server/lib/utils/utils.py +++ b/querybook/server/lib/utils/utils.py @@ -1,8 +1,12 @@ +from contextlib import contextmanager import inspect import signal import subprocess from datetime import datetime, date from functools import wraps +from typing import Optional, Union + +import gevent from lib.logger import get_logger @@ -91,6 +95,23 @@ class TimeoutError(Exception): pass +@contextmanager +def GeventTimeout( + sec: Union[int, float] = 1, custom_error_message: Optional[str] = None +): + """This timeout function can be used in gevent celery worker or the web server (which is powered by gevent)""" + + error_message = custom_error_message or f"Timeout Exception: {sec} seconds" + timeout = gevent.Timeout(sec, TimeoutError(error_message)) + timeout.start() + + try: + yield + finally: + timeout.close() + + +# Deprecated: use GeventTimeout if possible, the Timeout would break in gevent worker class Timeout: def __init__(self, sec, custom_error_message=None): self.error_message = custom_error_message or f"Timeout Exception: {sec} seconds" diff --git a/querybook/server/runweb.py b/querybook/server/runweb.py index 041d4edea..0518a0d79 100644 --- a/querybook/server/runweb.py +++ b/querybook/server/runweb.py @@ -1,7 +1,3 @@ -"""This file is for dev server only. - DO NOT USE FOR PROD -""" - from gevent import monkey monkey.patch_all() diff --git a/querybook/server/tasks/run_query.py b/querybook/server/tasks/run_query.py index d93fdc493..20f379169 100644 --- a/querybook/server/tasks/run_query.py +++ b/querybook/server/tasks/run_query.py @@ -17,7 +17,6 @@ from logic.elasticsearch import update_query_execution_by_id from tasks.log_query_per_table import log_query_per_table_task - LOG = get_task_logger(__name__) diff --git a/querybook/tests/test_lib/test_utils/test_utils.py b/querybook/tests/test_lib/test_utils/test_utils.py new file mode 100644 index 000000000..6b0e5db33 --- /dev/null +++ b/querybook/tests/test_lib/test_utils/test_utils.py @@ -0,0 +1,11 @@ +import time + +import pytest + +from lib.utils.utils import GeventTimeout, TimeoutError + + +def test_timeout(): + with pytest.raises(TimeoutError): + with GeventTimeout(0.1): + time.sleep(1)