Skip to content

Commit

Permalink
Add ForLoop action to repeat entities with an index
Browse files Browse the repository at this point in the history
Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
christophebedard committed Nov 6, 2024
1 parent cb7b362 commit 593554a
Show file tree
Hide file tree
Showing 7 changed files with 519 additions and 0 deletions.
2 changes: 2 additions & 0 deletions launch/launch/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .emit_event import EmitEvent
from .execute_local import ExecuteLocal
from .execute_process import ExecuteProcess
from .for_loop import ForLoop
from .group_action import GroupAction
from .include_launch_description import IncludeLaunchDescription
from .log_info import LogInfo
Expand All @@ -45,6 +46,7 @@
'EmitEvent',
'ExecuteLocal',
'ExecuteProcess',
'ForLoop',
'GroupAction',
'IncludeLaunchDescription',
'LogInfo',
Expand Down
194 changes: 194 additions & 0 deletions launch/launch/actions/for_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# Copyright 2024 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module for the ForLoop action."""

from copy import deepcopy
from typing import Callable
from typing import List
from typing import Optional
from typing import Text


from ..action import Action
from ..actions.opaque_function import OpaqueFunction
from ..frontend import Entity
from ..frontend import expose_action
from ..frontend import Parser
from ..launch_context import LaunchContext
from ..launch_description_entity import LaunchDescriptionEntity
from ..logging import get_logger
from ..some_substitutions_type import SomeSubstitutionsType
from ..substitution import Substitution
from ..utilities import perform_substitutions


@expose_action('for')
class ForLoop(Action):
"""
Action that instantiates entities through a function N times based on a launch argument.
A DeclareLaunchArgument must be created before this action to define the number of iterations
in the for-loop, i.e., N iterations. For each loop iteration, the provided callback function is
called with the index value, going from 0 to N (exclusive).
Simple example:
.. code-block:: python
def for_i(i: int):
return [
LogInfo(msg=['i=', str(i)]),
]
LaunchDescription([
DeclareLaunchArgument('num', default_value='2'),
ForLoop(LaunchConfiguration('num'), function=for_i),
])
When using this action through a frontend, provide entities to be instantiated for each loop
iteration as child entities. Use an $(index) substitution with the index name of the for-loop.
Simple example:
.. code-block:: xml
<launch>
<arg name="num" default="2" />
<for len="$(var num)" name="i" >
<log message="i=$(index i)" />
</for>
</launch>
The above examples would ouput the following log messages by default:
.. code-block:: text
i=0
i=1
If the launch argument was set to 5 (num:=5), then it would output:
.. code-block:: text
i=0
i=1
i=2
i=3
i=4
"""

def __init__(
self,
length: SomeSubstitutionsType,
*,
function: Callable[[int], Optional[List[LaunchDescriptionEntity]]],
name: Optional[SomeSubstitutionsType] = None,
**kwargs,
) -> None:
"""
Create a ForLoop.
:param length: the length of the for-loop
:param function: a function that receives an index value and returns entities
:param name: the for-loop name, used as the index name with the ForLoopIndex substitution
"""
super().__init__(**kwargs)

from ..utilities import normalize_to_list_of_substitutions # import here to avoid loop
self._length = normalize_to_list_of_substitutions(length)
self._function = function
self._name = normalize_to_list_of_substitutions(name) if name else []
self._logger = get_logger(__name__)

@property
def length(self) -> List[Substitution]:
return self._length

@property
def function(self) -> Callable[[int], Optional[List[LaunchDescriptionEntity]]]:
return self._function

@property
def name(self) -> List[Substitution]:
return self._name

def describe(self) -> Text:
return (
type(self).__name__ +
f"(length='{self._length}', name='{self._name}', function={self._function})"
)

@classmethod
def parse(cls, entity: Entity, parser: Parser):
"""Return `ForLoop` action and kwargs for constructing it."""
_, kwargs = super().parse(entity, parser)
length = entity.get_attr('len')
if length is not None:
kwargs['length'] = parser.parse_substitution(length)
name = entity.get_attr('name')
kwargs['name'] = name
parsed_children = [parser.parse_action(e) for e in entity.children]

def for_i(i: int):
return [
# Push and pop locals to avoid having the index local leak
OpaqueFunction(function=cls._push_locals),
# Set a local equal to i so that it can be used as a unique value by the entities
# through the ForLoopIndex substitution
OpaqueFunction(function=cls._set_index_local, args=(name, i)),
# Include a deep copy of parsed child entities
*deepcopy(parsed_children),
OpaqueFunction(function=cls._pop_locals),
]
kwargs['function'] = for_i
return cls, kwargs

def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEntity]]:
# Get the for-loop length and convert to int
length = int(perform_substitutions(context, self._length))
self._logger.debug(f'for-loop length={length}')

entities = []
for i in range(length):
i_entities = self._function(i)
if i_entities:
entities.extend(i_entities)
return entities

@classmethod
def _push_locals(
cls,
context: LaunchContext,
) -> Optional[List[LaunchDescriptionEntity]]:
context._push_locals()
return None

@classmethod
def _pop_locals(
cls,
context: LaunchContext,
) -> Optional[List[LaunchDescriptionEntity]]:
context._pop_locals()
return None

@classmethod
def _set_index_local(
cls,
context: LaunchContext,
local_name: str,
index: int,
) -> Optional[List[LaunchDescriptionEntity]]:
context.extend_locals({local_name: str(index)})
return None
2 changes: 2 additions & 0 deletions launch/launch/substitutions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .equals_substitution import EqualsSubstitution
from .file_content import FileContent
from .find_executable import FindExecutable
from .for_loop_index import ForLoopIndex
from .if_else_substitution import IfElseSubstitution
from .launch_configuration import LaunchConfiguration
from .launch_log_dir import LaunchLogDir
Expand All @@ -46,6 +47,7 @@
'EqualsSubstitution',
'EnvironmentVariable',
'FileContent',
'ForLoopIndex',
'FindExecutable',
'IfElseSubstitution',
'LaunchConfiguration',
Expand Down
70 changes: 70 additions & 0 deletions launch/launch/substitutions/for_loop_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2024 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module for the ForLoopIndex substitution."""

from typing import List
from typing import Sequence
from typing import Text

from .local_substitution import LocalSubstitution
from ..frontend import expose_substitution
from ..launch_context import LaunchContext
from ..logging import get_logger
from ..some_substitutions_type import SomeSubstitutionsType
from ..substitution import Substitution
from ..utilities import perform_substitutions


@expose_substitution('index')
class ForLoopIndex(Substitution):
"""Substitution for a ForLoop iteration index value."""

def __init__(
self,
name: SomeSubstitutionsType,
) -> None:
"""
Create a ForLoopIndex.
:param name: the name of the ForLoop index which this substitution is part of
"""
super().__init__()

from ..utilities import normalize_to_list_of_substitutions # import here to avoid loop
self._name = normalize_to_list_of_substitutions(name)
self._logger = get_logger(__name__)

@property
def name(self) -> List[Substitution]:
return self._name

def describe(self) -> Text:
return f"ForLoopIndex(name={' + '.join([sub.describe() for sub in self._name])})"

@classmethod
def parse(cls, data: Sequence[SomeSubstitutionsType]):
if len(data) != 1:
raise ValueError('ForLoopIndex substitution expects 1 argument')
kwargs = {}
kwargs['name'] = data[0]
return cls, kwargs

def perform(self, context: LaunchContext) -> Text:
name = perform_substitutions(context, self._name)
self._logger.debug('name=' + name)
index_substitution = LocalSubstitution(name)
index = perform_substitutions(context, [index_substitution])
self._logger.debug('index=' + index)
return index
89 changes: 89 additions & 0 deletions launch/test/launch/actions/test_for_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2024 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for the ForLoop action."""

from typing import List

from launch import Action
from launch import LaunchContext
from launch.actions import DeclareLaunchArgument
from launch.actions import ForLoop
from launch.substitutions import LaunchConfiguration


def for_i(returned_entities, i_collector: List[int] = None):
def f(i: int):
if i_collector is not None:
i_collector.append(i)
return returned_entities
return f


def test_for_loop_constructors():
"""Test the constructors for the ForLoop class."""
ForLoop('2', function=for_i([]))
ForLoop(LaunchConfiguration('num'), function=for_i([]))


def test_for_loop_execute():
"""Test the execute() of the ForLoop class."""
context = LaunchContext()
i_values = []

result = ForLoop('0', function=for_i([], i_values)).visit(context)
assert len(result) == 0
assert i_values == []
i_values.clear()

result = ForLoop('0', function=for_i(None, i_values)).visit(context)
assert len(result) == 0
assert i_values == []
i_values.clear()

result = ForLoop('2', function=for_i([], i_values)).visit(context)
assert len(result) == 0
assert i_values == [0, 1]
i_values.clear()

result = ForLoop('0', function=for_i([Action()], i_values)).visit(context)
assert len(result) == 0
assert i_values == []
i_values.clear()

result = ForLoop('2', function=for_i([Action()], i_values)).visit(context)
assert len(result) == 2
assert isinstance(result[0], Action)
assert isinstance(result[1], Action)
assert i_values == [0, 1]
i_values.clear()

# Use launch arg, first with default value then non-default value
DeclareLaunchArgument('num', default_value='4').visit(context)
result = ForLoop(
LaunchConfiguration('num'),
function=for_i([Action()], i_values),
).visit(context)
assert len(result) == 4
assert i_values == [0, 1, 2, 3]
i_values.clear()
context.launch_configurations['num'] = '5'
result = ForLoop(
LaunchConfiguration('num'),
function=for_i([Action()], i_values),
).visit(context)
assert len(result) == 5
assert i_values == [0, 1, 2, 3, 4]
i_values.clear()
context.launch_configurations.clear()
Loading

0 comments on commit 593554a

Please sign in to comment.