diff --git a/launch/launch/actions/__init__.py b/launch/launch/actions/__init__.py index ae0577e3..98da8e05 100644 --- a/launch/launch/actions/__init__.py +++ b/launch/launch/actions/__init__.py @@ -19,6 +19,7 @@ from .emit_event import EmitEvent from .execute_local import ExecuteLocal from .execute_process import ExecuteProcess +from .for_loop import ForLoop from .group_action import GroupAction from .include_launch_description import IncludeLaunchDescription from .log_info import LogInfo @@ -45,6 +46,7 @@ 'EmitEvent', 'ExecuteLocal', 'ExecuteProcess', + 'ForLoop', 'GroupAction', 'IncludeLaunchDescription', 'LogInfo', diff --git a/launch/launch/actions/for_loop.py b/launch/launch/actions/for_loop.py new file mode 100644 index 00000000..d4cd1199 --- /dev/null +++ b/launch/launch/actions/for_loop.py @@ -0,0 +1,194 @@ +# Copyright 2024 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module for the ForLoop action.""" + +from copy import deepcopy +from typing import Callable +from typing import List +from typing import Optional +from typing import Text + + +from ..action import Action +from ..actions.opaque_function import OpaqueFunction +from ..frontend import Entity +from ..frontend import expose_action +from ..frontend import Parser +from ..launch_context import LaunchContext +from ..launch_description_entity import LaunchDescriptionEntity +from ..logging import get_logger +from ..some_substitutions_type import SomeSubstitutionsType +from ..substitution import Substitution +from ..utilities import perform_substitutions + + +@expose_action('for') +class ForLoop(Action): + """ + Action that instantiates entities through a function N times based on a launch argument. + + A DeclareLaunchArgument must be created before this action to define the number of iterations + in the for-loop, i.e., N iterations. For each loop iteration, the provided callback function is + called with the index value, going from 0 to N (exclusive). + + Simple example: + + .. code-block:: python + + def for_i(i: int): + return [ + LogInfo(msg=['i=', str(i)]), + ] + + LaunchDescription([ + DeclareLaunchArgument('num', default_value='2'), + ForLoop(LaunchConfiguration('num'), function=for_i), + ]) + + When using this action through a frontend, provide entities to be instantiated for each loop + iteration as child entities. Use an $(index) substitution with the name of the for-loop length. + + Simple example: + + .. code-block:: xml + + + + + + + + + The above examples would ouput the following log messages by default: + + .. code-block:: text + + i=0 + i=1 + + If the launch argument was set to 5 (num:=5), then it would output: + + .. code-block:: text + + i=0 + i=1 + i=2 + i=3 + i=4 + """ + + def __init__( + self, + length: SomeSubstitutionsType, + *, + function: Callable[[int], Optional[List[LaunchDescriptionEntity]]], + name: Optional[SomeSubstitutionsType] = None, + **kwargs, + ) -> None: + """ + Create a ForLoop. + + :param length: the length of the for-loop + :param function: a function that receives an index value and returns entities + :param name: the for-loop name, used as the index name with the ForLoopIndex substitution + """ + super().__init__(**kwargs) + + from ..utilities import normalize_to_list_of_substitutions # import here to avoid loop + self._length = normalize_to_list_of_substitutions(length) + self._function = function + self._name = normalize_to_list_of_substitutions(name) if name else [] + self._logger = get_logger(__name__) + + @property + def length(self) -> List[Substitution]: + return self._length + + @property + def function(self) -> Callable[[int], Optional[List[LaunchDescriptionEntity]]]: + return self._function + + @property + def name(self) -> List[Substitution]: + return self._name + + def describe(self) -> Text: + return ( + type(self).__name__ + + f"(length='{self._length}', name='{self._name}', function={self._function})" + ) + + @classmethod + def parse(cls, entity: Entity, parser: Parser): + """Return `ForLoop` action and kwargs for constructing it.""" + _, kwargs = super().parse(entity, parser) + length = entity.get_attr('len') + if length is not None: + kwargs['length'] = parser.parse_substitution(length) + name = entity.get_attr('name') + kwargs['name'] = name + parsed_children = [parser.parse_action(e) for e in entity.children] + + def for_i(i: int): + return [ + # Push and pop locals to avoid having the index local leak + OpaqueFunction(function=cls._push_locals), + # Set a local equal to i so that it can be used as a unique value by the entities + # through the ForLoopIndex substitution + OpaqueFunction(function=cls._set_index_local, args=(name, i)), + # Include a deep copy of parsed child entities + *deepcopy(parsed_children), + OpaqueFunction(function=cls._pop_locals), + ] + kwargs['function'] = for_i + return cls, kwargs + + def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEntity]]: + # Get the for-loop length and convert to int + length = int(perform_substitutions(context, self._length)) + self._logger.debug(f'for-loop length={length}') + + entities = [] + for i in range(length): + i_entities = self._function(i) + if i_entities: + entities.extend(i_entities) + return entities + + @classmethod + def _push_locals( + cls, + context: LaunchContext, + ) -> Optional[List[LaunchDescriptionEntity]]: + context._push_locals() + return None + + @classmethod + def _pop_locals( + cls, + context: LaunchContext, + ) -> Optional[List[LaunchDescriptionEntity]]: + context._pop_locals() + return None + + @classmethod + def _set_index_local( + cls, + context: LaunchContext, + local_name: str, + index: int, + ) -> Optional[List[LaunchDescriptionEntity]]: + context.extend_locals({local_name: str(index)}) + return None diff --git a/launch/launch/substitutions/__init__.py b/launch/launch/substitutions/__init__.py index 1622deba..b698bbfe 100644 --- a/launch/launch/substitutions/__init__.py +++ b/launch/launch/substitutions/__init__.py @@ -25,6 +25,7 @@ from .equals_substitution import EqualsSubstitution from .file_content import FileContent from .find_executable import FindExecutable +from .for_loop_index import ForLoopIndex from .if_else_substitution import IfElseSubstitution from .launch_configuration import LaunchConfiguration from .launch_log_dir import LaunchLogDir @@ -46,6 +47,7 @@ 'EqualsSubstitution', 'EnvironmentVariable', 'FileContent', + 'ForLoopIndex', 'FindExecutable', 'IfElseSubstitution', 'LaunchConfiguration', diff --git a/launch/launch/substitutions/for_loop_index.py b/launch/launch/substitutions/for_loop_index.py new file mode 100644 index 00000000..005a3188 --- /dev/null +++ b/launch/launch/substitutions/for_loop_index.py @@ -0,0 +1,70 @@ +# Copyright 2024 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module for the ForLoopIndex substitution.""" + +from typing import List +from typing import Sequence +from typing import Text + +from .local_substitution import LocalSubstitution +from ..frontend import expose_substitution +from ..launch_context import LaunchContext +from ..logging import get_logger +from ..some_substitutions_type import SomeSubstitutionsType +from ..substitution import Substitution +from ..utilities import perform_substitutions + + +@expose_substitution('index') +class ForLoopIndex(Substitution): + """Substitution for a ForLoop iteration index value.""" + + def __init__( + self, + name: SomeSubstitutionsType, + ) -> None: + """ + Create a ForLoopIndex. + + :param name: the name of the ForLoop index which this substitution is part of + """ + super().__init__() + + from ..utilities import normalize_to_list_of_substitutions # import here to avoid loop + self._name = normalize_to_list_of_substitutions(name) + self._logger = get_logger(__name__) + + @property + def name(self) -> List[Substitution]: + return self._name + + def describe(self) -> Text: + return f"ForLoopIndex(name={' + '.join([sub.describe() for sub in self._name])})" + + @classmethod + def parse(cls, data: Sequence[SomeSubstitutionsType]): + if len(data) != 1: + raise ValueError('ForLoopIndex substitution expects 1 argument') + kwargs = {} + kwargs['name'] = data[0] + return cls, kwargs + + def perform(self, context: LaunchContext) -> Text: + name = perform_substitutions(context, self._name) + self._logger.debug('name=' + name) + index_substitution = LocalSubstitution(name) + index = perform_substitutions(context, [index_substitution]) + self._logger.debug('index=' + index) + return index