From 1fae040c5301a80364bfd940d15d8c5db0a0b5ca Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Sun, 26 Jan 2025 21:12:30 -0800 Subject: [PATCH 1/5] don't override timeout on with_overrides if not specified Signed-off-by: Paul Dittamo --- flytekit/core/node.py | 24 +++++++----- flytekit/core/promise.py | 2 +- .../flytekit/unit/core/test_node_creation.py | 39 ++++++++++++++++--- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/flytekit/core/node.py b/flytekit/core/node.py index 61ae41c060..c1718cc3cc 100644 --- a/flytekit/core/node.py +++ b/flytekit/core/node.py @@ -47,6 +47,8 @@ class Node(object): ID, which from the registration step """ + timeout_override_sentinel = object() + def __init__( self, id: str, @@ -127,7 +129,7 @@ def metadata(self) -> _workflow_model.NodeMetadata: def _override_node_metadata( self, name, - timeout: Optional[Union[int, datetime.timedelta]] = None, + timeout: Optional[Union[int, datetime.timedelta, None]] = timeout_override_sentinel, retries: Optional[int] = None, interruptible: typing.Optional[bool] = None, cache: typing.Optional[bool] = None, @@ -142,14 +144,16 @@ def _override_node_metadata( else: node_metadata = self._metadata - if timeout is None: - node_metadata._timeout = datetime.timedelta() - elif isinstance(timeout, int): - node_metadata._timeout = datetime.timedelta(seconds=timeout) - elif isinstance(timeout, datetime.timedelta): - node_metadata._timeout = timeout - else: - raise ValueError("timeout should be duration represented as either a datetime.timedelta or int seconds") + if timeout is not Node.timeout_override_sentinel: + if timeout is None: + node_metadata._timeout = 0 + elif isinstance(timeout, int): + node_metadata._timeout = datetime.timedelta(seconds=timeout) + elif isinstance(timeout, datetime.timedelta): + node_metadata._timeout = timeout + else: + raise ValueError("timeout should be duration represented as either a datetime.timedelta or int seconds") + if retries is not None: assert_not_promise(retries, "retries") node_metadata._retries = ( @@ -181,7 +185,7 @@ def with_overrides( aliases: Optional[Dict[str, str]] = None, requests: Optional[Resources] = None, limits: Optional[Resources] = None, - timeout: Optional[Union[int, datetime.timedelta]] = None, + timeout: Optional[Union[int, datetime.timedelta, None]] = timeout_override_sentinel, retries: Optional[int] = None, interruptible: Optional[bool] = None, name: Optional[str] = None, diff --git a/flytekit/core/promise.py b/flytekit/core/promise.py index d64f2461e5..49bc14b83b 100644 --- a/flytekit/core/promise.py +++ b/flytekit/core/promise.py @@ -586,7 +586,7 @@ def with_overrides( aliases: Optional[Dict[str, str]] = None, requests: Optional[Resources] = None, limits: Optional[Resources] = None, - timeout: Optional[Union[int, datetime.timedelta]] = None, + timeout: Optional[Union[int, datetime.timedelta, None]] = Node.timeout_override_sentinel, retries: Optional[int] = None, interruptible: Optional[bool] = None, name: Optional[str] = None, diff --git a/tests/flytekit/unit/core/test_node_creation.py b/tests/flytekit/unit/core/test_node_creation.py index 381f456bdb..51686b7385 100644 --- a/tests/flytekit/unit/core/test_node_creation.py +++ b/tests/flytekit/unit/core/test_node_creation.py @@ -302,18 +302,42 @@ def my_wf(a: typing.List[str]) -> typing.List[str]: ] +preset_timeout = datetime.timedelta(seconds=100) + + @pytest.mark.parametrize( - "timeout,expected", - [(None, datetime.timedelta()), (10, datetime.timedelta(seconds=10))], + "timeout,t1_expected_timeout_overridden, t1_expected_timeout_unset, t2_expected_timeout_overridden, " + "t2_expected_timeout_unset", + [ + (None, 0, 0, 0, preset_timeout), + (10, datetime.timedelta(seconds=10), 0, + datetime.timedelta(seconds=10), preset_timeout) + ], ) -def test_timeout_override(timeout, expected): +def test_timeout_override( + timeout, + t1_expected_timeout_overridden, + t1_expected_timeout_unset, + t2_expected_timeout_overridden, + t2_expected_timeout_unset, + ): @task def t1(a: str) -> str: return f"*~*~*~{a}*~*~*~" + @task( + timeout=preset_timeout + ) + def t2(a: str) -> str: + return f"*~*~*~{a}*~*~*~" + @workflow def my_wf(a: str) -> str: - return t1(a=a).with_overrides(timeout=timeout) + s = t1(a=a).with_overrides(timeout=timeout) + s1 = t1(a=s).with_overrides() + s2 = t2(a=s1).with_overrides(timeout=timeout) + s3 = t2(a=s2).with_overrides() + return s3 serialization_settings = flytekit.configuration.SerializationSettings( project="test_proj", @@ -323,8 +347,11 @@ def my_wf(a: str) -> str: env={}, ) wf_spec = get_serializable(OrderedDict(), serialization_settings, my_wf) - assert len(wf_spec.template.nodes) == 1 - assert wf_spec.template.nodes[0].metadata.timeout == expected + assert len(wf_spec.template.nodes) == 4 + assert wf_spec.template.nodes[0].metadata.timeout == t1_expected_timeout_overridden + assert wf_spec.template.nodes[1].metadata.timeout == t1_expected_timeout_unset + assert wf_spec.template.nodes[2].metadata.timeout == t2_expected_timeout_overridden + assert wf_spec.template.nodes[3].metadata.timeout == t2_expected_timeout_unset def test_timeout_override_invalid_value(): From f205c4165269d42472bf7af2dec71bb8aa34a363 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Sun, 26 Jan 2025 22:11:15 -0800 Subject: [PATCH 2/5] lint Signed-off-by: Paul Dittamo --- flytekit/core/node.py | 4 ++-- flytekit/core/promise.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flytekit/core/node.py b/flytekit/core/node.py index c1718cc3cc..6c4e94c9c5 100644 --- a/flytekit/core/node.py +++ b/flytekit/core/node.py @@ -129,7 +129,7 @@ def metadata(self) -> _workflow_model.NodeMetadata: def _override_node_metadata( self, name, - timeout: Optional[Union[int, datetime.timedelta, None]] = timeout_override_sentinel, + timeout: Optional[Union[int, datetime.timedelta, object]] = timeout_override_sentinel, retries: Optional[int] = None, interruptible: typing.Optional[bool] = None, cache: typing.Optional[bool] = None, @@ -185,7 +185,7 @@ def with_overrides( aliases: Optional[Dict[str, str]] = None, requests: Optional[Resources] = None, limits: Optional[Resources] = None, - timeout: Optional[Union[int, datetime.timedelta, None]] = timeout_override_sentinel, + timeout: Optional[Union[int, datetime.timedelta, object]] = timeout_override_sentinel, retries: Optional[int] = None, interruptible: Optional[bool] = None, name: Optional[str] = None, diff --git a/flytekit/core/promise.py b/flytekit/core/promise.py index 49bc14b83b..751d632059 100644 --- a/flytekit/core/promise.py +++ b/flytekit/core/promise.py @@ -586,7 +586,7 @@ def with_overrides( aliases: Optional[Dict[str, str]] = None, requests: Optional[Resources] = None, limits: Optional[Resources] = None, - timeout: Optional[Union[int, datetime.timedelta, None]] = Node.timeout_override_sentinel, + timeout: Optional[Union[int, datetime.timedelta, object]] = Node.timeout_override_sentinel, retries: Optional[int] = None, interruptible: Optional[bool] = None, name: Optional[str] = None, From 4276012e24ba3de7e913112c717ffe10db564c4a Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Wed, 5 Feb 2025 16:02:16 -0800 Subject: [PATCH 3/5] clean up Signed-off-by: Paul Dittamo --- flytekit/core/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/core/node.py b/flytekit/core/node.py index 6c4e94c9c5..c1c75c1a80 100644 --- a/flytekit/core/node.py +++ b/flytekit/core/node.py @@ -47,7 +47,7 @@ class Node(object): ID, which from the registration step """ - timeout_override_sentinel = object() + TIMEOUT_OVERRIDE_SENTINEL = object() def __init__( self, From 1e6bb36fab083e06177351c935cd63549df97228 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Wed, 5 Feb 2025 16:05:23 -0800 Subject: [PATCH 4/5] clean up Signed-off-by: Paul Dittamo --- flytekit/core/node.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flytekit/core/node.py b/flytekit/core/node.py index c1c75c1a80..1c16539b92 100644 --- a/flytekit/core/node.py +++ b/flytekit/core/node.py @@ -129,7 +129,7 @@ def metadata(self) -> _workflow_model.NodeMetadata: def _override_node_metadata( self, name, - timeout: Optional[Union[int, datetime.timedelta, object]] = timeout_override_sentinel, + timeout: Optional[Union[int, datetime.timedelta, object]] = TIMEOUT_OVERRIDE_SENTINEL, retries: Optional[int] = None, interruptible: typing.Optional[bool] = None, cache: typing.Optional[bool] = None, @@ -144,7 +144,7 @@ def _override_node_metadata( else: node_metadata = self._metadata - if timeout is not Node.timeout_override_sentinel: + if timeout is not Node.TIMEOUT_OVERRIDE_SENTINEL: if timeout is None: node_metadata._timeout = 0 elif isinstance(timeout, int): @@ -185,7 +185,7 @@ def with_overrides( aliases: Optional[Dict[str, str]] = None, requests: Optional[Resources] = None, limits: Optional[Resources] = None, - timeout: Optional[Union[int, datetime.timedelta, object]] = timeout_override_sentinel, + timeout: Optional[Union[int, datetime.timedelta, object]] = TIMEOUT_OVERRIDE_SENTINEL, retries: Optional[int] = None, interruptible: Optional[bool] = None, name: Optional[str] = None, From f43f286250f20eed1152c22904def4c3d235ae3e Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Wed, 5 Feb 2025 16:18:17 -0800 Subject: [PATCH 5/5] clean up Signed-off-by: Paul Dittamo --- flytekit/core/promise.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/core/promise.py b/flytekit/core/promise.py index 751d632059..76d817ffeb 100644 --- a/flytekit/core/promise.py +++ b/flytekit/core/promise.py @@ -586,7 +586,7 @@ def with_overrides( aliases: Optional[Dict[str, str]] = None, requests: Optional[Resources] = None, limits: Optional[Resources] = None, - timeout: Optional[Union[int, datetime.timedelta, object]] = Node.timeout_override_sentinel, + timeout: Optional[Union[int, datetime.timedelta, object]] = Node.TIMEOUT_OVERRIDE_SENTINEL, retries: Optional[int] = None, interruptible: Optional[bool] = None, name: Optional[str] = None,