Skip to content

Commit

Permalink
Fixed thread local _sentinel.callers defect and added test cases (apa…
Browse files Browse the repository at this point in the history
…che#44646)

* Update base.py

* Update base.py

* Update base.py

* Update baseoperator.py

* Update base.py

* Update base.py

* Update baseoperator.py

* Update baseoperator.py

* Fixed thread local _sentinel.callers defect and added test cases

* Fixed issue

* Fixed issue

* Fixed issue

* Fixed issue

* Fixed issue

* Fixed issue

* Fixed issue

---------

Co-authored-by: Rahul Goyal <[email protected]>
  • Loading branch information
rahulgoyal2987 and Rahul Goyal authored Dec 11, 2024
1 parent 419d8c6 commit a77fca2
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
2 changes: 2 additions & 0 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ def wrapper(self, *args, **kwargs):
sentinel = kwargs.pop(sentinel_key, None)

if sentinel:
if not getattr(cls._sentinel, "callers", None):
cls._sentinel.callers = {}
cls._sentinel.callers[sentinel_key] = sentinel
else:
sentinel = cls._sentinel.callers.pop(f"{func.__qualname__.split('.')[0]}__sentinel", None)
Expand Down
18 changes: 18 additions & 0 deletions tests/models/test_baseoperatormeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

import datetime
import threading
from typing import TYPE_CHECKING, Any
from unittest.mock import patch

Expand Down Expand Up @@ -204,3 +205,20 @@ def say_hello(**context):
mock_log.warning.assert_called_once_with(
"HelloWorldOperator.execute cannot be called outside TaskInstance!"
)

def test_thread_local_executor_safeguard(self):
class TestExecutorSafeguardThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.executor_safeguard = ExecutorSafeguard()

def run(self):
class Wrapper:
def wrapper_test_func(self, *args, **kwargs):
print("test")

wrap_func = self.executor_safeguard.decorator(Wrapper.wrapper_test_func)
wrap_func(Wrapper(), Wrapper__sentinel="abc")

# Test thread local caller value is set properly
TestExecutorSafeguardThread().start()

0 comments on commit a77fca2

Please sign in to comment.