Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add update_execution func in remote #1860

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,22 @@ def create_execution(self, project, domain, name, execution_spec, inputs):
.id
)

def update_execution(self, id, state: typing.Literal[0, 1] = None, tags: typing.List[str] = None):
"""
This will update an execution for the given execution spec.
:param flytekit.models.core.identifier.WorkflowExecutionIdentifier id:
:param state: Optional state to update the execution to.
:param list[str] tags: Optional tags to update the execution with.
"""

return super(SynchronousFlyteClient, self).update_execution(
_execution_pb2.ExecutionUpdateRequest(
id=id.to_flyte_idl(),
state=state,
tags=tags,
)
)

def recover_execution(self, id, name: str = None):
"""
Recreates a previously-run workflow execution that will only start executing from the last known failure point.
Expand Down
9 changes: 9 additions & 0 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,15 @@ def get_execution(self, get_object_request):
"""
return self._stub.GetExecution(get_object_request, metadata=self._metadata)

def update_execution(self, update_execution_request):
"""
Updates an execution of a workflow entity.

:param flyteidl.admin.execution_pb2.ExecutionUpdateRequest update_execution_request:
:rtype: flyteidl.admin.execution_pb2.ExecutionUpdateResponse
"""
return self._stub.UpdateExecution(update_execution_request, metadata=self._metadata)

def get_execution_data(self, get_execution_data_request):
"""
Returns signed URLs to LiteralMap blobs for an execution's inputs and outputs (when available).
Expand Down
36 changes: 36 additions & 0 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ def fetch_workflow(
"""
if name is None:
raise user_exceptions.FlyteAssertion("the 'name' argument must be specified.")

workflow_id = _get_entity_identifier(
self.client.list_workflows_paginated,
ResourceType.WORKFLOW,
Expand Down Expand Up @@ -428,6 +429,41 @@ def fetch_execution(self, project: str = None, domain: str = None, name: str = N
)
return self.sync_execution(execution)

######################
# Update Execution #
######################

def update_execution(
self,
project: str = None,
domain: str = None,
name: str = None,
state: typing.Literal[0, 1] = None,
tags: typing.List[str] = None,
) -> FlyteWorkflowExecution:
"""Update a workflow execution entity from flyte admin.

:param project: update entity from this project. If None, uses the default_project attribute.
:param domain: update entity from this domain. If None, uses the default_domain attribute.
:param name: update entity with matching name.
:param state: state to be updated. 0 = Active, 1 = Archived.
:param tags: List of tags to be updated.
:returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution`

:raises: FlyteAssertion if name is None
"""
if name is None:
raise user_exceptions.FlyteAssertion("the 'name' argument must be specified.")
self.client.update_execution(
WorkflowExecutionIdentifier(
project or self.default_project,
domain or self.default_domain,
name,
),
state,
tags,
)

######################
# Listing Entities #
######################
Expand Down