Skip to content

Commit

Permalink
Fix sentinel_kwargs load from ENV (apache#36318)
Browse files Browse the repository at this point in the history
* fix(providers/centry): sentinel_kwargs fron env

* test: add sentinel kwargs test
  • Loading branch information
vaaalik authored Dec 20, 2023
1 parent 23b8753 commit e2393ee
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
10 changes: 5 additions & 5 deletions airflow/providers/celery/executors/default_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""Default celery configuration."""
from __future__ import annotations

import json
import logging
import ssl

Expand Down Expand Up @@ -47,18 +48,17 @@ def _broker_supports_visibility_timeout(url):

broker_url = conf.get("celery", "BROKER_URL", fallback="redis://redis:6379/0")

broker_transport_options = conf.getsection("celery_broker_transport_options") or {}
broker_transport_options: dict = conf.getsection("celery_broker_transport_options") or {}
if "visibility_timeout" not in broker_transport_options:
if _broker_supports_visibility_timeout(broker_url):
broker_transport_options["visibility_timeout"] = 21600

broker_transport_options_for_celery: dict = broker_transport_options.copy()
if "sentinel_kwargs" in broker_transport_options:
try:
sentinel_kwargs = broker_transport_options.get("sentinel_kwargs")
sentinel_kwargs = json.loads(broker_transport_options["sentinel_kwargs"])
if not isinstance(sentinel_kwargs, dict):
raise ValueError
broker_transport_options_for_celery["sentinel_kwargs"] = sentinel_kwargs
broker_transport_options["sentinel_kwargs"] = sentinel_kwargs
except Exception:
raise AirflowException("sentinel_kwargs should be written in the correct dictionary format.")

Expand All @@ -77,7 +77,7 @@ def _broker_supports_visibility_timeout(url):
"task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"),
"task_track_started": conf.getboolean("celery", "task_track_started", fallback=True),
"broker_url": broker_url,
"broker_transport_options": broker_transport_options_for_celery,
"broker_transport_options": broker_transport_options,
"result_backend": result_backend,
"database_engine_options": conf.getjson(
"celery", "result_backend_sqlalchemy_engine_options", fallback={}
Expand Down
11 changes: 11 additions & 0 deletions tests/providers/celery/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,3 +347,14 @@ def test_celery_executor_with_no_recommended_result_backend(caplog):
"You have configured a result_backend using the protocol `rediss`,"
" it is highly recommended to use an alternative result_backend (i.e. a database)."
) in caplog.text


@conf_vars({("celery_broker_transport_options", "sentinel_kwargs"): '{"service_name": "mymaster"}'})
def test_sentinel_kwargs_loaded_from_string():
import importlib

# reload celery conf to apply the new config
importlib.reload(default_celery)
assert default_celery.DEFAULT_CELERY_CONFIG["broker_transport_options"]["sentinel_kwargs"] == {
"service_name": "mymaster"
}

0 comments on commit e2393ee

Please sign in to comment.