Skip to content

Commit

Permalink
Feature/array node workflow parallelism (#2268)
Browse files Browse the repository at this point in the history
* default array node concurrency to -1

Signed-off-by: Paul Dittamo <[email protected]>

* typo

Signed-off-by: Paul Dittamo <[email protected]>

* set default concurrency to None for backwards compatibility

Signed-off-by: Paul Dittamo <[email protected]>

* update unit test - hashed name

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: Jan Fiedler <[email protected]>
  • Loading branch information
pvditt authored and fiedlerNr9 committed Jul 25, 2024
1 parent 3a892f6 commit 6982ea8
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
5 changes: 3 additions & 2 deletions flytekit/core/array_node_map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def _raw_execute(self, **kwargs) -> Any:

def map_task(
task_function: PythonFunctionTask,
concurrency: int = 0,
concurrency: Optional[int] = None,
# TODO why no min_successes?
min_success_ratio: float = 1.0,
**kwargs,
Expand All @@ -328,7 +328,8 @@ def map_task(
:param task_function: This argument is implicitly passed and represents the repeatable function
:param concurrency: If specified, this limits the number of mapped tasks than can run in parallel to the given batch
size. If the size of the input exceeds the concurrency value, then multiple batches will be run serially until
all inputs are processed. If left unspecified, this means unbounded concurrency.
all inputs are processed. If set to 0, this means unbounded concurrency. If left unspecified, this means the
array node will inherit parallelism from the workflow
:param min_success_ratio: If specified, this determines the minimum fraction of total jobs which can complete
successfully before terminating this task and marking it successful.
"""
Expand Down
8 changes: 4 additions & 4 deletions tests/flytekit/unit/core/test_array_node_map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def many_inputs(a: int, b: str, c: float) -> str:
assert m.python_interface.inputs == {"a": List[int], "b": List[str], "c": List[float]}
assert (
m.name
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_bf51001578d0ae197a52c0af0a99dd89-arraynode"
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_6b3bd0353da5de6e84d7982921ead2b3-arraynode"
)
r_m = ArrayNodeMapTask(many_inputs)
assert str(r_m.python_interface) == str(m.python_interface)
Expand All @@ -194,7 +194,7 @@ def many_inputs(a: int, b: str, c: float) -> str:
assert m.python_interface.inputs == {"a": List[int], "b": List[str], "c": float}
assert (
m.name
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_cb470e880fabd6265ec80e29fe60250d-arraynode"
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_7df6892fe8ce5343c76197a0b6127e80-arraynode"
)
r_m = ArrayNodeMapTask(many_inputs, bound_inputs=set("c"))
assert str(r_m.python_interface) == str(m.python_interface)
Expand All @@ -204,7 +204,7 @@ def many_inputs(a: int, b: str, c: float) -> str:
assert m.python_interface.inputs == {"a": List[int], "b": str, "c": float}
assert (
m.name
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_316e10eb97f5d2abd585951048b807b9-arraynode"
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_80fd21f14571026755b99d6b1c045089-arraynode"
)
r_m = ArrayNodeMapTask(many_inputs, bound_inputs={"c", "b"})
assert str(r_m.python_interface) == str(m.python_interface)
Expand All @@ -214,7 +214,7 @@ def many_inputs(a: int, b: str, c: float) -> str:
assert m.python_interface.inputs == {"a": int, "b": str, "c": float}
assert (
m.name
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_758022acd59ad1c8b81670378d4de4f6-arraynode"
== "tests.flytekit.unit.core.test_array_node_map_task.map_many_inputs_5d2500dc176052a030efda3b8c283f96-arraynode"
)
r_m = ArrayNodeMapTask(many_inputs, bound_inputs={"a", "c", "b"})
assert str(r_m.python_interface) == str(m.python_interface)
Expand Down

0 comments on commit 6982ea8

Please sign in to comment.