Skip to content

Commit

Permalink
Introduce team id to executor names and loader (apache#44710)
Browse files Browse the repository at this point in the history
When multi team (AIP-67) is eventually in place, the scheduler will need to initialize a set of executors for each team. This change is a first step towards that which updates the ExecutorName data model and the executor loader to be "team aware"
  • Loading branch information
o-nikolas authored Dec 13, 2024
1 parent 007e887 commit 957bd24
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 84 deletions.
3 changes: 2 additions & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@ class BaseExecutor(LoggingMixin):
name: None | ExecutorName = None
callback_sink: BaseCallbackSink | None = None

def __init__(self, parallelism: int = PARALLELISM):
def __init__(self, parallelism: int = PARALLELISM, team_id: str | None = None):
super().__init__()
self.parallelism: int = parallelism
self.team_id: str | None = team_id
self.queued_tasks: dict[TaskInstanceKey, QueuedTaskInstanceType] = {}
self.running: set[TaskInstanceKey] = set()
self.event_buffer: dict[TaskInstanceKey, EventBufferValueType] = {}
Expand Down
134 changes: 94 additions & 40 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
# executor may have both so we need two lookup dicts.
_alias_to_executors: dict[str, ExecutorName] = {}
_module_to_executors: dict[str, ExecutorName] = {}
# Used to lookup an ExecutorName via the team id.
_team_id_to_executors: dict[str | None, ExecutorName] = {}
_classname_to_executors: dict[str, ExecutorName] = {}
# Used to cache the computed ExecutorNames so that we don't need to read/parse config more than once
_executor_names: list[ExecutorName] = []
Expand Down Expand Up @@ -79,53 +81,69 @@ def _get_executor_names(cls) -> list[ExecutorName]:
if _executor_names:
return _executor_names

executor_names_raw = conf.get_mandatory_list_value("core", "EXECUTOR")
all_executor_names: list[tuple[None | str, list[str]]] = [
(None, conf.get_mandatory_list_value("core", "EXECUTOR"))
]
all_executor_names.extend(cls._get_team_executor_configs())

executor_names = []
for name in executor_names_raw:
if len(split_name := name.split(":")) == 1:
name = split_name[0]
# Check if this is an alias for a core airflow executor, module
# paths won't be provided by the user in that case.
if core_executor_module := cls.executors.get(name):
executor_names.append(ExecutorName(alias=name, module_path=core_executor_module))
# A module path was provided
for team_id, executor_names_config in all_executor_names:
executor_names_per_team = []
for name in executor_names_config:
if len(split_name := name.split(":")) == 1:
name = split_name[0]
# Check if this is an alias for a core airflow executor, module
# paths won't be provided by the user in that case.
if core_executor_module := cls.executors.get(name):
executor_names_per_team.append(
ExecutorName(module_path=core_executor_module, alias=name, team_id=team_id)
)
# A module path was provided
else:
executor_names_per_team.append(
ExecutorName(alias=None, module_path=name, team_id=team_id)
)
# An alias was provided with the module path
elif len(split_name) == 2:
# Ensure the user is not trying to override the existing aliases of any of the core
# executors by providing an alias along with the existing core airflow executor alias
# (e.g. my_local_exec_alias:LocalExecutor). Allowing this makes things unnecessarily
# complicated. Multiple Executors of the same type will be supported by a future
# multitenancy AIP.
# The module component should always be a module path.
module_path = split_name[1]
if not module_path or module_path in CORE_EXECUTOR_NAMES or "." not in module_path:
raise AirflowConfigException(
"Incorrectly formatted executor configuration. Second portion of an executor "
f"configuration must be a module path but received: {module_path}"
)
else:
executor_names_per_team.append(
ExecutorName(alias=split_name[0], module_path=split_name[1], team_id=team_id)
)
else:
executor_names.append(ExecutorName(alias=None, module_path=name))
# An alias was provided with the module path
elif len(split_name) == 2:
# Ensure the user is not trying to override the existing aliases of any of the core
# executors by providing an alias along with the existing core airflow executor alias
# (e.g. my_local_exec_alias:LocalExecutor). Allowing this makes things unnecessarily
# complicated. Multiple Executors of the same type will be supported by a future multitenancy
# AIP.
# The module component should always be a module path.
module_path = split_name[1]
if not module_path or module_path in CORE_EXECUTOR_NAMES or "." not in module_path:
raise AirflowConfigException(
"Incorrectly formatted executor configuration. Second portion of an executor "
f"configuration must be a module path but received: {module_path}"
)
else:
executor_names.append(ExecutorName(alias=split_name[0], module_path=split_name[1]))
else:
raise AirflowConfigException(f"Incorrectly formatted executor configuration: {name}")

# As of now, we do not allow duplicate executors.
# Add all module paths to a set, since the actual code is what is unique
unique_modules = set([exec_name.module_path for exec_name in executor_names])
if len(unique_modules) < len(executor_names):
msg = (
"At least one executor was configured twice. Duplicate executors are not yet supported. "
"Please check your configuration again to correct the issue."
)
raise AirflowConfigException(msg)
raise AirflowConfigException(f"Incorrectly formatted executor configuration: {name}")

# As of now, we do not allow duplicate executors (within teams).
# Add all module paths to a set, since the actual code is what is unique
unique_modules = set([exec_name.module_path for exec_name in executor_names_per_team])
if len(unique_modules) < len(executor_names_per_team):
msg = (
"At least one executor was configured twice. Duplicate executors are not yet supported.\n"
"Please check your configuration again to correct the issue."
)
raise AirflowConfigException(msg)

executor_names.extend(executor_names_per_team)

# Populate some mappings for fast future lookups
for executor_name in executor_names:
# Executors will not always have aliases
if executor_name.alias:
_alias_to_executors[executor_name.alias] = executor_name
# All executors will have a team id. It _may_ be None, for now that means it is a system
# level executor
_team_id_to_executors[executor_name.team_id] = executor_name
# All executors will have a module path
_module_to_executors[executor_name.module_path] = executor_name
_classname_to_executors[executor_name.module_path.split(".")[-1]] = executor_name
Expand All @@ -134,6 +152,39 @@ def _get_executor_names(cls) -> list[ExecutorName]:

return executor_names

@classmethod
def block_use_of_multi_team(cls):
"""
Raise an exception if the user tries to use multiple team based executors.
Before the feature is complete we do not want users to accidentally configure this.
This can be overridden by setting the AIRFLOW__DEV__MULTI_TEAM_MODE environment
variable to "enabled"
This check is built into a method so that it can be easily mocked in unit tests.
"""
team_dev_mode: str | None = os.environ.get("AIRFLOW__DEV__MULTI_TEAM_MODE")
if not team_dev_mode or team_dev_mode != "enabled":
raise AirflowConfigException("Configuring multiple team based executors is not yet supported!")

@classmethod
def _get_team_executor_configs(cls) -> list[tuple[str, list[str]]]:
"""
Return a list of executor configs to be loaded.
Each tuple contains the team id as the first element and the second element is the executor config
for that team (a list of executor names/modules/aliases).
"""
from airflow.configuration import conf

team_config = conf.get("core", "multi_team_config_files", fallback=None)
configs = []
if team_config:
cls.block_use_of_multi_team()
for team in team_config.split(","):
(_, team_id) = team.split(":")
configs.append((team_id, conf.get_mandatory_list_value("core", "executor", team_id=team_id)))
return configs

@classmethod
def get_executor_names(cls) -> list[ExecutorName]:
"""
Expand Down Expand Up @@ -166,7 +217,7 @@ def init_executors(cls) -> list[BaseExecutor]:
executor_names = cls._get_executor_names()
loaded_executors = []
for executor_name in executor_names:
loaded_executor = cls.load_executor(executor_name.module_path)
loaded_executor = cls.load_executor(executor_name)
if executor_name.alias:
cls.executors[executor_name.alias] = executor_name.module_path
else:
Expand Down Expand Up @@ -216,7 +267,10 @@ def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor
else:
executor_cls, import_source = cls.import_executor_cls(_executor_name)
log.debug("Loading executor %s from %s", _executor_name, import_source.value)
executor = executor_cls()
if _executor_name.team_id:
executor = executor_cls(team_id=_executor_name.team_id)
else:
executor = executor_cls()

except ImportError as e:
log.error(e)
Expand Down
25 changes: 16 additions & 9 deletions airflow/executors/executor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,42 @@
class ExecutorName(LoggingMixin):
"""Representation of an executor config/name."""

def __init__(self, module_path, alias=None):
self.module_path = module_path
self.alias = alias
def __init__(self, module_path: str, alias: str | None = None, team_id: str | None = None) -> None:
self.module_path: str = module_path
self.alias: str | None = alias
self.team_id: str | None = team_id
self.set_connector_source()

def set_connector_source(self):
def set_connector_source(self) -> None:
if self.alias in CORE_EXECUTOR_NAMES:
self.connector_source = ConnectorSource.CORE
else:
# Executor must be a module
self.connector_source = ConnectorSource.CUSTOM_PATH

def __repr__(self):
def __repr__(self) -> str:
"""Implement repr."""
if self.alias in CORE_EXECUTOR_NAMES:
return self.alias
return f"{self.alias}:{self.module_path}" if self.alias else f"{self.module_path}"
# This is a "core executor" we can refer to it by its known short name
return f"{self.team_id if self.team_id else ''}:{self.alias}:"
return (
f"{self.team_id if self.team_id else ''}:"
f"{self.alias if self.alias else ''}:"
f"{self.module_path}"
)

def __eq__(self, other):
def __eq__(self, other) -> bool:
"""Implement eq."""
if (
self.alias == other.alias
and self.module_path == other.module_path
and self.connector_source == other.connector_source
and self.team_id == other.team_id
):
return True
else:
return False

def __hash__(self):
def __hash__(self) -> int:
"""Implement hash."""
return hash(self.__repr__())
4 changes: 2 additions & 2 deletions tests/cli/test_cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def test_hybrid_executor_get_cli_commands_with_error(
assert celery_executor_command.name in commands
assert ecs_executor_command.name not in commands
assert (
"Failed to load CLI commands from executor: airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
"Failed to load CLI commands from executor: ::airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
in caplog.messages[0]
)

Expand Down Expand Up @@ -265,7 +265,7 @@ def test_cli_parser_fail_to_load_executor(self, ecs_executor_cli_commands_mock,
commands = [command.name for command in cli_parser.airflow_commands]
assert ecs_executor_command.name in commands
assert (
"Failed to load CLI commands from executor: airflow.providers.incorrect.executor.Executor"
"Failed to load CLI commands from executor: ::airflow.providers.incorrect.executor.Executor"
in caplog.messages[0]
)

Expand Down
Loading

0 comments on commit 957bd24

Please sign in to comment.