diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 1ba0ed15567ba..a50dd801f9e68 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -128,9 +128,10 @@ class BaseExecutor(LoggingMixin): name: None | ExecutorName = None callback_sink: BaseCallbackSink | None = None - def __init__(self, parallelism: int = PARALLELISM): + def __init__(self, parallelism: int = PARALLELISM, team_id: str | None = None): super().__init__() self.parallelism: int = parallelism + self.team_id: str | None = team_id self.queued_tasks: dict[TaskInstanceKey, QueuedTaskInstanceType] = {} self.running: set[TaskInstanceKey] = set() self.event_buffer: dict[TaskInstanceKey, EventBufferValueType] = {} diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py index cd025e38c8d65..9355fbb26a5ae 100644 --- a/airflow/executors/executor_loader.py +++ b/airflow/executors/executor_loader.py @@ -46,6 +46,8 @@ # executor may have both so we need two lookup dicts. _alias_to_executors: dict[str, ExecutorName] = {} _module_to_executors: dict[str, ExecutorName] = {} +# Used to lookup an ExecutorName via the team id. +_team_id_to_executors: dict[str | None, ExecutorName] = {} _classname_to_executors: dict[str, ExecutorName] = {} # Used to cache the computed ExecutorNames so that we don't need to read/parse config more than once _executor_names: list[ExecutorName] = [] @@ -79,53 +81,69 @@ def _get_executor_names(cls) -> list[ExecutorName]: if _executor_names: return _executor_names - executor_names_raw = conf.get_mandatory_list_value("core", "EXECUTOR") + all_executor_names: list[tuple[None | str, list[str]]] = [ + (None, conf.get_mandatory_list_value("core", "EXECUTOR")) + ] + all_executor_names.extend(cls._get_team_executor_configs()) executor_names = [] - for name in executor_names_raw: - if len(split_name := name.split(":")) == 1: - name = split_name[0] - # Check if this is an alias for a core airflow executor, module - # paths won't be provided by the user in that case. - if core_executor_module := cls.executors.get(name): - executor_names.append(ExecutorName(alias=name, module_path=core_executor_module)) - # A module path was provided + for team_id, executor_names_config in all_executor_names: + executor_names_per_team = [] + for name in executor_names_config: + if len(split_name := name.split(":")) == 1: + name = split_name[0] + # Check if this is an alias for a core airflow executor, module + # paths won't be provided by the user in that case. + if core_executor_module := cls.executors.get(name): + executor_names_per_team.append( + ExecutorName(module_path=core_executor_module, alias=name, team_id=team_id) + ) + # A module path was provided + else: + executor_names_per_team.append( + ExecutorName(alias=None, module_path=name, team_id=team_id) + ) + # An alias was provided with the module path + elif len(split_name) == 2: + # Ensure the user is not trying to override the existing aliases of any of the core + # executors by providing an alias along with the existing core airflow executor alias + # (e.g. my_local_exec_alias:LocalExecutor). Allowing this makes things unnecessarily + # complicated. Multiple Executors of the same type will be supported by a future + # multitenancy AIP. + # The module component should always be a module path. + module_path = split_name[1] + if not module_path or module_path in CORE_EXECUTOR_NAMES or "." not in module_path: + raise AirflowConfigException( + "Incorrectly formatted executor configuration. Second portion of an executor " + f"configuration must be a module path but received: {module_path}" + ) + else: + executor_names_per_team.append( + ExecutorName(alias=split_name[0], module_path=split_name[1], team_id=team_id) + ) else: - executor_names.append(ExecutorName(alias=None, module_path=name)) - # An alias was provided with the module path - elif len(split_name) == 2: - # Ensure the user is not trying to override the existing aliases of any of the core - # executors by providing an alias along with the existing core airflow executor alias - # (e.g. my_local_exec_alias:LocalExecutor). Allowing this makes things unnecessarily - # complicated. Multiple Executors of the same type will be supported by a future multitenancy - # AIP. - # The module component should always be a module path. - module_path = split_name[1] - if not module_path or module_path in CORE_EXECUTOR_NAMES or "." not in module_path: - raise AirflowConfigException( - "Incorrectly formatted executor configuration. Second portion of an executor " - f"configuration must be a module path but received: {module_path}" - ) - else: - executor_names.append(ExecutorName(alias=split_name[0], module_path=split_name[1])) - else: - raise AirflowConfigException(f"Incorrectly formatted executor configuration: {name}") - - # As of now, we do not allow duplicate executors. - # Add all module paths to a set, since the actual code is what is unique - unique_modules = set([exec_name.module_path for exec_name in executor_names]) - if len(unique_modules) < len(executor_names): - msg = ( - "At least one executor was configured twice. Duplicate executors are not yet supported. " - "Please check your configuration again to correct the issue." - ) - raise AirflowConfigException(msg) + raise AirflowConfigException(f"Incorrectly formatted executor configuration: {name}") + + # As of now, we do not allow duplicate executors (within teams). + # Add all module paths to a set, since the actual code is what is unique + unique_modules = set([exec_name.module_path for exec_name in executor_names_per_team]) + if len(unique_modules) < len(executor_names_per_team): + msg = ( + "At least one executor was configured twice. Duplicate executors are not yet supported.\n" + "Please check your configuration again to correct the issue." + ) + raise AirflowConfigException(msg) + + executor_names.extend(executor_names_per_team) # Populate some mappings for fast future lookups for executor_name in executor_names: # Executors will not always have aliases if executor_name.alias: _alias_to_executors[executor_name.alias] = executor_name + # All executors will have a team id. It _may_ be None, for now that means it is a system + # level executor + _team_id_to_executors[executor_name.team_id] = executor_name # All executors will have a module path _module_to_executors[executor_name.module_path] = executor_name _classname_to_executors[executor_name.module_path.split(".")[-1]] = executor_name @@ -134,6 +152,39 @@ def _get_executor_names(cls) -> list[ExecutorName]: return executor_names + @classmethod + def block_use_of_multi_team(cls): + """ + Raise an exception if the user tries to use multiple team based executors. + + Before the feature is complete we do not want users to accidentally configure this. + This can be overridden by setting the AIRFLOW__DEV__MULTI_TEAM_MODE environment + variable to "enabled" + This check is built into a method so that it can be easily mocked in unit tests. + """ + team_dev_mode: str | None = os.environ.get("AIRFLOW__DEV__MULTI_TEAM_MODE") + if not team_dev_mode or team_dev_mode != "enabled": + raise AirflowConfigException("Configuring multiple team based executors is not yet supported!") + + @classmethod + def _get_team_executor_configs(cls) -> list[tuple[str, list[str]]]: + """ + Return a list of executor configs to be loaded. + + Each tuple contains the team id as the first element and the second element is the executor config + for that team (a list of executor names/modules/aliases). + """ + from airflow.configuration import conf + + team_config = conf.get("core", "multi_team_config_files", fallback=None) + configs = [] + if team_config: + cls.block_use_of_multi_team() + for team in team_config.split(","): + (_, team_id) = team.split(":") + configs.append((team_id, conf.get_mandatory_list_value("core", "executor", team_id=team_id))) + return configs + @classmethod def get_executor_names(cls) -> list[ExecutorName]: """ @@ -166,7 +217,7 @@ def init_executors(cls) -> list[BaseExecutor]: executor_names = cls._get_executor_names() loaded_executors = [] for executor_name in executor_names: - loaded_executor = cls.load_executor(executor_name.module_path) + loaded_executor = cls.load_executor(executor_name) if executor_name.alias: cls.executors[executor_name.alias] = executor_name.module_path else: @@ -216,7 +267,10 @@ def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor else: executor_cls, import_source = cls.import_executor_cls(_executor_name) log.debug("Loading executor %s from %s", _executor_name, import_source.value) - executor = executor_cls() + if _executor_name.team_id: + executor = executor_cls(team_id=_executor_name.team_id) + else: + executor = executor_cls() except ImportError as e: log.error(e) diff --git a/airflow/executors/executor_utils.py b/airflow/executors/executor_utils.py index 016e01d8d0c3f..4d34a4144881c 100644 --- a/airflow/executors/executor_utils.py +++ b/airflow/executors/executor_utils.py @@ -23,35 +23,42 @@ class ExecutorName(LoggingMixin): """Representation of an executor config/name.""" - def __init__(self, module_path, alias=None): - self.module_path = module_path - self.alias = alias + def __init__(self, module_path: str, alias: str | None = None, team_id: str | None = None) -> None: + self.module_path: str = module_path + self.alias: str | None = alias + self.team_id: str | None = team_id self.set_connector_source() - def set_connector_source(self): + def set_connector_source(self) -> None: if self.alias in CORE_EXECUTOR_NAMES: self.connector_source = ConnectorSource.CORE else: # Executor must be a module self.connector_source = ConnectorSource.CUSTOM_PATH - def __repr__(self): + def __repr__(self) -> str: """Implement repr.""" if self.alias in CORE_EXECUTOR_NAMES: - return self.alias - return f"{self.alias}:{self.module_path}" if self.alias else f"{self.module_path}" + # This is a "core executor" we can refer to it by its known short name + return f"{self.team_id if self.team_id else ''}:{self.alias}:" + return ( + f"{self.team_id if self.team_id else ''}:" + f"{self.alias if self.alias else ''}:" + f"{self.module_path}" + ) - def __eq__(self, other): + def __eq__(self, other) -> bool: """Implement eq.""" if ( self.alias == other.alias and self.module_path == other.module_path and self.connector_source == other.connector_source + and self.team_id == other.team_id ): return True else: return False - def __hash__(self): + def __hash__(self) -> int: """Implement hash.""" return hash(self.__repr__()) diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py index 23363b379d8c7..3bfbaeb23bd1f 100644 --- a/tests/cli/test_cli_parser.py +++ b/tests/cli/test_cli_parser.py @@ -237,7 +237,7 @@ def test_hybrid_executor_get_cli_commands_with_error( assert celery_executor_command.name in commands assert ecs_executor_command.name not in commands assert ( - "Failed to load CLI commands from executor: airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor" + "Failed to load CLI commands from executor: ::airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor" in caplog.messages[0] ) @@ -265,7 +265,7 @@ def test_cli_parser_fail_to_load_executor(self, ecs_executor_cli_commands_mock, commands = [command.name for command in cli_parser.airflow_commands] assert ecs_executor_command.name in commands assert ( - "Failed to load CLI commands from executor: airflow.providers.incorrect.executor.Executor" + "Failed to load CLI commands from executor: ::airflow.providers.incorrect.executor.Executor" in caplog.messages[0] ) diff --git a/tests/executors/test_executor_loader.py b/tests/executors/test_executor_loader.py index 48c531c5cb942..87455bd841b3d 100644 --- a/tests/executors/test_executor_loader.py +++ b/tests/executors/test_executor_loader.py @@ -83,21 +83,46 @@ def test_should_support_custom_path(self): assert executor.name.connector_source == ConnectorSource.CUSTOM_PATH @pytest.mark.parametrize( - ("executor_config", "expected_executors_list"), + ("executor_config", "team_executor_config", "expected_executors_list"), [ - # Just one executor - ( + pytest.param( "CeleryExecutor", + [], [ ExecutorName( "airflow.providers.celery.executors.celery_executor.CeleryExecutor", "CeleryExecutor", ), ], + id="one_executor", ), - # Core executors and custom module path executor - ( + pytest.param( + "CeleryExecutor", + [ + ("team_a", ["CeleryExecutor"]), + ("team_b", ["LocalExecutor"]), + ], + [ + ExecutorName( + "airflow.providers.celery.executors.celery_executor.CeleryExecutor", + "CeleryExecutor", + ), + ExecutorName( + "airflow.providers.celery.executors.celery_executor.CeleryExecutor", + "CeleryExecutor", + "team_a", + ), + ExecutorName( + "airflow.executors.local_executor.LocalExecutor", + "LocalExecutor", + "team_b", + ), + ], + id="one_executor_per_team", + ), + pytest.param( "CeleryExecutor, LocalExecutor, tests.executors.test_executor_loader.FakeExecutor", + [], [ ExecutorName( "airflow.providers.celery.executors.celery_executor.CeleryExecutor", @@ -112,12 +137,50 @@ def test_should_support_custom_path(self): None, ), ], + id="core_executors_and_custom_module_path_executor", + ), + pytest.param( + "CeleryExecutor, LocalExecutor, tests.executors.test_executor_loader.FakeExecutor", + [ + ("team_a", ["CeleryExecutor", "tests.executors.test_executor_loader.FakeExecutor"]), + ("team_b", ["tests.executors.test_executor_loader.FakeExecutor"]), + ], + [ + ExecutorName( + "airflow.providers.celery.executors.celery_executor.CeleryExecutor", + "CeleryExecutor", + ), + ExecutorName( + "airflow.executors.local_executor.LocalExecutor", + "LocalExecutor", + ), + ExecutorName( + "tests.executors.test_executor_loader.FakeExecutor", + None, + ), + ExecutorName( + "airflow.providers.celery.executors.celery_executor.CeleryExecutor", + "CeleryExecutor", + "team_a", + ), + ExecutorName( + "tests.executors.test_executor_loader.FakeExecutor", + None, + "team_a", + ), + ExecutorName( + "tests.executors.test_executor_loader.FakeExecutor", + None, + "team_b", + ), + ], + id="core_executors_and_custom_module_path_executor_per_team", ), - # Core executors and custom module path executor with aliases - ( + pytest.param( ( "CeleryExecutor, LocalExecutor, fake_exec:tests.executors.test_executor_loader.FakeExecutor" ), + [], [ ExecutorName( "airflow.providers.celery.executors.celery_executor.CeleryExecutor", @@ -132,13 +195,62 @@ def test_should_support_custom_path(self): "fake_exec", ), ], + id="core_executors_and_custom_module_path_executor_with_aliases", + ), + pytest.param( + ( + "CeleryExecutor, LocalExecutor, fake_exec:tests.executors.test_executor_loader.FakeExecutor" + ), + [ + ( + "team_a", + ["CeleryExecutor", "fake_exec:tests.executors.test_executor_loader.FakeExecutor"], + ), + ("team_b", ["fake_exec:tests.executors.test_executor_loader.FakeExecutor"]), + ], + [ + ExecutorName( + "airflow.providers.celery.executors.celery_executor.CeleryExecutor", + "CeleryExecutor", + ), + ExecutorName( + "airflow.executors.local_executor.LocalExecutor", + "LocalExecutor", + ), + ExecutorName( + "tests.executors.test_executor_loader.FakeExecutor", + "fake_exec", + ), + ExecutorName( + "airflow.providers.celery.executors.celery_executor.CeleryExecutor", + "CeleryExecutor", + "team_a", + ), + ExecutorName( + "tests.executors.test_executor_loader.FakeExecutor", + "fake_exec", + "team_a", + ), + ExecutorName( + "tests.executors.test_executor_loader.FakeExecutor", + "fake_exec", + "team_b", + ), + ], + id="core_executors_and_custom_module_path_executor_with_aliases_per_team", ), ], ) - def test_get_hybrid_executors_from_config(self, executor_config, expected_executors_list): + def test_get_hybrid_executors_from_config( + self, executor_config, team_executor_config, expected_executors_list + ): with conf_vars({("core", "executor"): executor_config}): - executors = ExecutorLoader._get_executor_names() - assert executors == expected_executors_list + with mock.patch( + "airflow.executors.executor_loader.ExecutorLoader._get_team_executor_configs", + return_value=team_executor_config, + ): + executors = ExecutorLoader._get_executor_names() + assert executors == expected_executors_list def test_init_executors(self): with conf_vars({("core", "executor"): "CeleryExecutor"}): diff --git a/tests/executors/test_executor_utils.py b/tests/executors/test_executor_utils.py index 395eb02deaf29..91e95727881a3 100644 --- a/tests/executors/test_executor_utils.py +++ b/tests/executors/test_executor_utils.py @@ -18,28 +18,110 @@ import pytest -from airflow.executors.executor_loader import ExecutorName +from airflow.executors.executor_constants import LOCAL_EXECUTOR, ConnectorSource +from airflow.executors.executor_loader import ExecutorLoader, ExecutorName + +CORE_EXEC_ALIAS = LOCAL_EXECUTOR +CORE_EXEC_MODULE_PATH = ExecutorLoader.executors[CORE_EXEC_ALIAS] +CORE_EXEC_TEAM_ID = "team_a" +CUSTOM_EXEC_MODULE_PATH = "custom.module.path" +CUSTOM_EXEC_ALIAS = "custom_executor" +CUSTOM_EXEC_TEAM_ID = "team_b" class TestExecutorName: - def test_executor_name_repr_alias(self): - executor_name = ExecutorName(module_path="foo.bar", alias="my_alias") - assert str(executor_name) == "my_alias:foo.bar" - - def test_executor_name_repr_no_alias(self): - executor_name = ExecutorName(module_path="foo.bar") - assert str(executor_name) == "foo.bar" - - @pytest.mark.parametrize( - ("name_args_1", "name_args_2", "expected_result"), - [ - (["foo.bar", "my_alias"], ["foo.bar", "my_alias"], True), - (["foo.bar"], ["foo.bar"], True), - (["foo.bar"], ["foo.bar", "my_alias"], False), - ], - ) - def test_executor_name_eq(self, name_args_1, name_args_2, expected_result): - executor_name_1 = ExecutorName(*name_args_1) - executor_name_2 = ExecutorName(*name_args_2) - eq_result = executor_name_1 == executor_name_2 - assert eq_result == expected_result + @pytest.fixture + def core_executor(self): + return ExecutorName(alias=CORE_EXEC_ALIAS, module_path=CORE_EXEC_MODULE_PATH) + + @pytest.fixture + def core_executor_team_id(self): + return ExecutorName( + alias=CORE_EXEC_ALIAS, module_path=CORE_EXEC_MODULE_PATH, team_id=CORE_EXEC_TEAM_ID + ) + + @pytest.fixture + def custom_executor(self): + return ExecutorName(module_path=CUSTOM_EXEC_MODULE_PATH) + + @pytest.fixture + def custom_executor_alias(self): + return ExecutorName(module_path=CUSTOM_EXEC_MODULE_PATH, alias=CUSTOM_EXEC_ALIAS) + + @pytest.fixture + def custom_executor_team_id(self): + return ExecutorName(module_path=CUSTOM_EXEC_MODULE_PATH, team_id=CUSTOM_EXEC_TEAM_ID) + + @pytest.fixture + def custom_executor_team_id_alias(self): + return ExecutorName( + module_path=CUSTOM_EXEC_MODULE_PATH, alias=CUSTOM_EXEC_ALIAS, team_id=CUSTOM_EXEC_TEAM_ID + ) + + def test_initialization( + self, + core_executor, + core_executor_team_id, + custom_executor, + custom_executor_team_id, + custom_executor_alias, + custom_executor_team_id_alias, + ): + assert core_executor.module_path == CORE_EXEC_MODULE_PATH + assert core_executor.alias is CORE_EXEC_ALIAS + assert core_executor.team_id is None + assert core_executor.connector_source == ConnectorSource.CORE + + assert core_executor_team_id.module_path == CORE_EXEC_MODULE_PATH + assert core_executor_team_id.alias is CORE_EXEC_ALIAS + assert core_executor_team_id.team_id == CORE_EXEC_TEAM_ID + assert core_executor_team_id.connector_source == ConnectorSource.CORE + + assert custom_executor.module_path == CUSTOM_EXEC_MODULE_PATH + assert custom_executor.alias is None + assert custom_executor.team_id is None + assert custom_executor.connector_source == ConnectorSource.CUSTOM_PATH + + assert custom_executor_team_id.module_path == CUSTOM_EXEC_MODULE_PATH + assert custom_executor_team_id.alias is None + assert custom_executor_team_id.team_id == CUSTOM_EXEC_TEAM_ID + assert custom_executor_team_id.connector_source == ConnectorSource.CUSTOM_PATH + + assert custom_executor_alias.module_path == CUSTOM_EXEC_MODULE_PATH + assert custom_executor_alias.alias == CUSTOM_EXEC_ALIAS + assert custom_executor_alias.team_id is None + assert custom_executor_alias.connector_source == ConnectorSource.CUSTOM_PATH + + assert custom_executor_team_id_alias.module_path == CUSTOM_EXEC_MODULE_PATH + assert custom_executor_team_id_alias.alias == CUSTOM_EXEC_ALIAS + assert custom_executor_team_id_alias.team_id == CUSTOM_EXEC_TEAM_ID + assert custom_executor_team_id_alias.connector_source == ConnectorSource.CUSTOM_PATH + + def test_repr_all(self, core_executor, core_executor_team_id, custom_executor_team_id_alias): + assert repr(core_executor) == f":{CORE_EXEC_ALIAS}:" + assert repr(core_executor_team_id) == f"{CORE_EXEC_TEAM_ID}:{CORE_EXEC_ALIAS}:" + assert ( + repr(custom_executor_team_id_alias) + == f"{CUSTOM_EXEC_TEAM_ID}:{CUSTOM_EXEC_ALIAS}:{CUSTOM_EXEC_MODULE_PATH}" + ) + + def test_eq_same(self, core_executor_team_id): + compare_exec = ExecutorName( + alias=CORE_EXEC_ALIAS, module_path=CORE_EXEC_MODULE_PATH, team_id=CORE_EXEC_TEAM_ID + ) + + assert core_executor_team_id == compare_exec + + def test_eq_different(self, core_executor, core_executor_team_id, custom_executor_team_id): + assert core_executor != core_executor_team_id + assert core_executor_team_id != custom_executor_team_id + + def test_hash_same(self, core_executor_team_id): + compare_exec = ExecutorName( + alias=CORE_EXEC_ALIAS, module_path=CORE_EXEC_MODULE_PATH, team_id=CORE_EXEC_TEAM_ID + ) + assert hash(core_executor_team_id) == hash(compare_exec) + + def test_hash_different(self, core_executor, core_executor_team_id, custom_executor_team_id_alias): + assert hash(core_executor) != hash(core_executor_team_id) + assert hash(core_executor_team_id) != hash(custom_executor_team_id_alias)