Skip to content

Commit

Permalink
Use gevent for celery worker, update timeout to GeventTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
czgu committed Oct 11, 2023
1 parent 036e8df commit 0094e6a
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 16 deletions.
2 changes: 1 addition & 1 deletion containers/docker-compose.prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ services:
worker:
extends:
service: base
command: ./querybook/scripts/runservice prod_worker -c 150 -l info -Ofair -n celery@%h
command: ./querybook/scripts/runservice prod_worker -P gevent -c 150 -l info -Ofair -n celery@%h
scheduler:
extends:
service: base
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ services:
image: *querybook-image
tty: true
stdin_open: true
command: './querybook/scripts/runservice worker -c 5'
command: './querybook/scripts/runservice worker -P gevent -c 5'
volumes: *querybook-volumes
depends_on: *querybook-depends-on
scheduler:
Expand Down
2 changes: 1 addition & 1 deletion querybook/scripts/run_test
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ usage() {

run_python_unit_test() {
echo 'Start running python unit tests >>>>>>>>>>>>>>>>>>>>>>>>>>>>'
py.test querybook/tests || exit 1
python -m gevent.monkey --module pytest querybook/tests || exit 1
}

run_webpack_test() {
Expand Down
1 change: 0 additions & 1 deletion querybook/server/app/flask_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def make_celery(app):
worker_max_tasks_per_child=1,
task_track_started=True,
task_soft_time_limit=172800,
worker_proc_alive_timeout=60,
broker_transport_options={
# This must be higher than soft time limit,
# otherwise the task will get retried (in the case of acks_late=True)
Expand Down
1 change: 0 additions & 1 deletion querybook/server/datasources/query_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ def cancel_query_and_notify():
"RECEIVED", # Rare case where task is received but not yet start
"RETRY", # Very unlikely case, because query normally do not retry
):

task.revoke(terminate=True) # last attempt to cancel it
cancel_query_and_notify()
elif task.state == "ABORTED":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .base_checker import BaseEngineStatusChecker, EngineStatus
from const.query_execution import QueryEngineStatus
from lib.query_executor.base_executor import QueryExecutorBaseClass
from lib.utils.utils import Timeout
from lib.utils.utils import GeventTimeout


class ConnectionChecker(BaseEngineStatusChecker):
Expand All @@ -24,7 +24,7 @@ def check_connection(
) -> EngineStatus:
result: EngineStatus = {"status": QueryEngineStatus.GOOD.value, "messages": []}
try:
with Timeout(20, "Connection took too long"):
with GeventTimeout(20, "Connection took too long"):
cursor = executor._get_client(client_settings).cursor()
utc_now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
result["messages"].append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from const.query_execution import QueryEngineStatus
from lib.query_executor.base_executor import QueryExecutorBaseClass
from lib.query_executor.base_client import CursorBaseClass
from lib.utils.utils import Timeout, TimeoutError
from lib.utils.utils import GeventTimeout, TimeoutError


class SelectOneChecker(BaseEngineStatusChecker):
Expand All @@ -29,7 +29,7 @@ def check_select_one(
) -> EngineStatus:
result: EngineStatus = {"status": QueryEngineStatus.GOOD.value, "messages": []}
try:
with Timeout(20, "Select 1 took too long"):
with GeventTimeout(20, "Select 1 took too long"):
cursor: CursorBaseClass = executor._get_client(client_settings).cursor()
cursor.run("select 1")
cursor.poll_until_finish()
Expand Down
4 changes: 2 additions & 2 deletions querybook/server/lib/query_executor/clients/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from pyhive import hive
from TCLIService.ttypes import TOperationState
from lib.utils.utils import Timeout
from lib.utils.utils import GeventTimeout
from lib.query_executor.base_client import ClientBaseClass, CursorBaseClass
from lib.query_executor.connection_string.hive import get_hive_connection_conf

Expand All @@ -21,7 +21,7 @@ def __init__(
*args,
**kwargs
):
with Timeout(120, "Timeout connecting to HiveServer"):
with GeventTimeout(120, "Timeout connecting to HiveServer"):
connection_conf = get_hive_connection_conf(connection_string)

port = 10000 if not connection_conf.port else connection_conf.port
Expand Down
21 changes: 21 additions & 0 deletions querybook/server/lib/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from contextlib import contextmanager
import inspect
import signal
import subprocess
from datetime import datetime, date
from functools import wraps
from typing import Optional, Union

import gevent

from lib.logger import get_logger

Expand Down Expand Up @@ -91,6 +95,23 @@ class TimeoutError(Exception):
pass


@contextmanager
def GeventTimeout(
sec: Union[int, float] = 1, custom_error_message: Optional[str] = None
):
"""This timeout function can be used in gevent celery worker or the web server (which is powered by gevent)"""

error_message = custom_error_message or f"Timeout Exception: {sec} seconds"
timeout = gevent.Timeout(sec, TimeoutError(error_message))
timeout.start()

try:
yield
finally:
timeout.close()


# Deprecated: use GeventTimeout if possible, the Timeout would break in gevent worker
class Timeout:
def __init__(self, sec, custom_error_message=None):
self.error_message = custom_error_message or f"Timeout Exception: {sec} seconds"
Expand Down
4 changes: 0 additions & 4 deletions querybook/server/runweb.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
"""This file is for dev server only.
DO NOT USE FOR PROD
"""

from gevent import monkey

monkey.patch_all()
Expand Down
1 change: 0 additions & 1 deletion querybook/server/tasks/run_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from logic.elasticsearch import update_query_execution_by_id
from tasks.log_query_per_table import log_query_per_table_task


LOG = get_task_logger(__name__)


Expand Down
11 changes: 11 additions & 0 deletions querybook/tests/test_lib/test_utils/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import time

import pytest

from lib.utils.utils import GeventTimeout, TimeoutError


def test_timeout():
with pytest.raises(TimeoutError):
with GeventTimeout(0.1):
time.sleep(1)

0 comments on commit 0094e6a

Please sign in to comment.