From 38bbc7238ac5a06193ce4a1be0ad964362d1f4e3 Mon Sep 17 00:00:00 2001 From: Da-Yi Wu Date: Fri, 29 Sep 2023 13:54:41 +0800 Subject: [PATCH 1/3] Add update_execution func in remote Signed-off-by: Da-Yi Wu --- flytekit/clients/friendly.py | 15 +++++++++++++++ flytekit/remote/remote.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/flytekit/clients/friendly.py b/flytekit/clients/friendly.py index d6d0581b2a..5d7078123c 100644 --- a/flytekit/clients/friendly.py +++ b/flytekit/clients/friendly.py @@ -560,6 +560,21 @@ def create_execution(self, project, domain, name, execution_spec, inputs): .id ) + def update_execution(self, id, state = None, tags: typing.List[str] = None): + """ + This will update an execution for the given execution spec. + :param flytekit.models.core.identifier.WorkflowExecutionIdentifier id: + :param state: Optional state to update the execution to. + :param list[str] tags: Optional tags to update the execution with. + """ + return super(SynchronousFlyteClient, self).update_execution( + _execution_pb2.ExecutionUpdateRequest( + id = id.to_flyte_idl(), + state = state, + tags = tags, + ) + ) + def recover_execution(self, id, name: str = None): """ Recreates a previously-run workflow execution that will only start executing from the last known failure point. diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 78fa3271e7..5e7ad6fc8a 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -345,6 +345,7 @@ def fetch_workflow( """ if name is None: raise user_exceptions.FlyteAssertion("the 'name' argument must be specified.") + workflow_id = _get_entity_identifier( self.client.list_workflows_paginated, ResourceType.WORKFLOW, @@ -427,6 +428,34 @@ def fetch_execution(self, project: str = None, domain: str = None, name: str = N ) ) return self.sync_execution(execution) + + ###################### + # Update Execution # + ###################### + + def update_execution(self, project: str = None, domain: str = None, name: str = None, state = None, tags: typing.List[str] = None) -> FlyteWorkflowExecution: + """Update a workflow execution entity from flyte admin. + + :param project: update entity from this project. If None, uses the default_project attribute. + :param domain: update entity from this domain. If None, uses the default_domain attribute. + :param name: update entity with matching name. + :param state: state to be updated + :param tags: tags to be updated + :returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution` + + :raises: FlyteAssertion if name is None + """ + if name is None: + raise user_exceptions.FlyteAssertion("the 'name' argument must be specified.") + self.client.update_execution( + WorkflowExecutionIdentifier( + project or self.default_project, + domain or self.default_domain, + name, + ), + state, + tags, + ) ###################### # Listing Entities # From 60681d49099307dfc2b0555d32b6a8f0e3c47886 Mon Sep 17 00:00:00 2001 From: Da-Yi Wu Date: Thu, 5 Oct 2023 10:16:20 +0800 Subject: [PATCH 2/3] UpdateTag make lint Signed-off-by: Da-Yi Wu --- flytekit/clients/friendly.py | 14 +++++++------- flytekit/remote/remote.py | 6 ++++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/flytekit/clients/friendly.py b/flytekit/clients/friendly.py index 5d7078123c..1b69f2f65b 100644 --- a/flytekit/clients/friendly.py +++ b/flytekit/clients/friendly.py @@ -560,7 +560,7 @@ def create_execution(self, project, domain, name, execution_spec, inputs): .id ) - def update_execution(self, id, state = None, tags: typing.List[str] = None): + def update_execution(self, id, state=None, tags: typing.List[str] = None): """ This will update an execution for the given execution spec. :param flytekit.models.core.identifier.WorkflowExecutionIdentifier id: @@ -568,13 +568,13 @@ def update_execution(self, id, state = None, tags: typing.List[str] = None): :param list[str] tags: Optional tags to update the execution with. """ return super(SynchronousFlyteClient, self).update_execution( - _execution_pb2.ExecutionUpdateRequest( - id = id.to_flyte_idl(), - state = state, - tags = tags, - ) + _execution_pb2.ExecutionUpdateRequest( + id=id.to_flyte_idl(), + state=state, + tags=tags, ) - + ) + def recover_execution(self, id, name: str = None): """ Recreates a previously-run workflow execution that will only start executing from the last known failure point. diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 5e7ad6fc8a..16f6794176 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -428,12 +428,14 @@ def fetch_execution(self, project: str = None, domain: str = None, name: str = N ) ) return self.sync_execution(execution) - + ###################### # Update Execution # ###################### - def update_execution(self, project: str = None, domain: str = None, name: str = None, state = None, tags: typing.List[str] = None) -> FlyteWorkflowExecution: + def update_execution( + self, project: str = None, domain: str = None, name: str = None, state=None, tags: typing.List[str] = None + ) -> FlyteWorkflowExecution: """Update a workflow execution entity from flyte admin. :param project: update entity from this project. If None, uses the default_project attribute. From 9f8b55568551b3d626a64488f1643a28a8f7678a Mon Sep 17 00:00:00 2001 From: Da-Yi Wu Date: Sun, 8 Oct 2023 12:34:38 +0800 Subject: [PATCH 3/3] Add typing to state Signed-off-by: Da-Yi Wu --- flytekit/clients/friendly.py | 3 ++- flytekit/clients/raw.py | 9 +++++++++ flytekit/remote/remote.py | 11 ++++++++--- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/flytekit/clients/friendly.py b/flytekit/clients/friendly.py index 1b69f2f65b..6388313379 100644 --- a/flytekit/clients/friendly.py +++ b/flytekit/clients/friendly.py @@ -560,13 +560,14 @@ def create_execution(self, project, domain, name, execution_spec, inputs): .id ) - def update_execution(self, id, state=None, tags: typing.List[str] = None): + def update_execution(self, id, state: typing.Literal[0, 1] = None, tags: typing.List[str] = None): """ This will update an execution for the given execution spec. :param flytekit.models.core.identifier.WorkflowExecutionIdentifier id: :param state: Optional state to update the execution to. :param list[str] tags: Optional tags to update the execution with. """ + return super(SynchronousFlyteClient, self).update_execution( _execution_pb2.ExecutionUpdateRequest( id=id.to_flyte_idl(), diff --git a/flytekit/clients/raw.py b/flytekit/clients/raw.py index 6cb80d4b8f..d408add5ae 100644 --- a/flytekit/clients/raw.py +++ b/flytekit/clients/raw.py @@ -374,6 +374,15 @@ def get_execution(self, get_object_request): """ return self._stub.GetExecution(get_object_request, metadata=self._metadata) + def update_execution(self, update_execution_request): + """ + Updates an execution of a workflow entity. + + :param flyteidl.admin.execution_pb2.ExecutionUpdateRequest update_execution_request: + :rtype: flyteidl.admin.execution_pb2.ExecutionUpdateResponse + """ + return self._stub.UpdateExecution(update_execution_request, metadata=self._metadata) + def get_execution_data(self, get_execution_data_request): """ Returns signed URLs to LiteralMap blobs for an execution's inputs and outputs (when available). diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 16f6794176..3892c20a9e 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -434,15 +434,20 @@ def fetch_execution(self, project: str = None, domain: str = None, name: str = N ###################### def update_execution( - self, project: str = None, domain: str = None, name: str = None, state=None, tags: typing.List[str] = None + self, + project: str = None, + domain: str = None, + name: str = None, + state: typing.Literal[0, 1] = None, + tags: typing.List[str] = None, ) -> FlyteWorkflowExecution: """Update a workflow execution entity from flyte admin. :param project: update entity from this project. If None, uses the default_project attribute. :param domain: update entity from this domain. If None, uses the default_domain attribute. :param name: update entity with matching name. - :param state: state to be updated - :param tags: tags to be updated + :param state: state to be updated. 0 = Active, 1 = Archived. + :param tags: List of tags to be updated. :returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution` :raises: FlyteAssertion if name is None