Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ability to run latest argo workflow template #303

Merged
merged 12 commits into from
Aug 20, 2024
16 changes: 15 additions & 1 deletion metaflow/plugins/aip/argo_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# talebz copied from https://github.com/Netflix/metaflow/blob/master/metaflow/plugins/argo/argo_client.py

import json
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, List

from metaflow.exception import MetaflowException
from metaflow.plugins.aws.eks.kubernetes_client import KubernetesClient
Expand Down Expand Up @@ -83,6 +83,20 @@ def create_workflow_config_map(self, name: str, config_map: Dict[str, Any]):
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def list_workflow_template(self, namespace: Optional[str] = None):
cloudw marked this conversation as resolved.
Show resolved Hide resolved
client = self._client.get()
try:
return client.CustomObjectsApi().list_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=namespace or self._namespace,
plural="workflowtemplates",
)
except client.rest.ApiException as e:
raise ArgoClientException(
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def trigger_workflow_template(self, name: str, parameters: Optional[Dict] = None):
client = self._client.get()
body = {
Expand Down
76 changes: 72 additions & 4 deletions metaflow/plugins/aip/argo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,38 @@

def run_argo_workflow(
kubernetes_namespace: str,
template_name: str,
template_name: Optional[str] = None,
project_name: Optional[str] = None,
branch_name: Optional[str] = None,
template_name_prefix: Optional[str] = None,
cloudw marked this conversation as resolved.
Show resolved Hide resolved
parameters: Optional[dict] = None,
wait_timeout: Union[int, float, datetime.timedelta] = 0,
**kwarg, # Other parameters for wait function
) -> Tuple[str, str]:
"""
Using template_name to trigger a workflow template name with exact match.

If no template_name is provided, the latest workflow template satisfying
the project_name, branch_name and template_name_prefix will be used.
All of these filters are optional. If not provided, the latest workflow
from the namespace will be used.
"""
client = ArgoClient(namespace=kubernetes_namespace)

if not template_name:
cloudw marked this conversation as resolved.
Show resolved Hide resolved
template_name = get_latest_workflow(
kubernetes_namespace,
project_name=project_name,
branch_name=branch_name,
template_name_prefix=template_name_prefix,
)
cloudw marked this conversation as resolved.
Show resolved Hide resolved

try:
# TODO(talebz): add tag of origin-run-id to correlate parent flow
workflow_manifest: Dict[str, Any] = ArgoClient(
namespace=kubernetes_namespace,
).trigger_workflow_template(template_name, parameters)
logger.info(f"Triggering workflow template: {template_name}")
workflow_manifest: Dict[str, Any] = client.trigger_workflow_template(
template_name, parameters
)
except Exception as e:
raise AIPException(str(e))

Expand All @@ -41,6 +63,52 @@ def run_argo_workflow(
return argo_run_id, argo_run_uid


def get_latest_workflow(
kubernetes_namespace: str,
project_name: Optional[str] = None,
branch_name: Optional[str] = None,
template_name_prefix: Optional[str] = None,
):
# TODO:
# - Add filter by project_id instead of project name - project_id is not added as a label yet.
# - Add filter by flow_name - flow_name is not added as a label yet.
client = ArgoClient(namespace=kubernetes_namespace)

templates = client.list_workflow_template()["items"]
templates = [
template
for template in templates
if (
not project_name
or template["metadata"]["labels"]["gitlab.zgtools.net/project-name"]
== project_name
)
and (
not branch_name
or template["metadata"]["labels"]["gitlab.zgtools.net/branch-name"]
== branch_name
)
and (
not template_name_prefix
or template["metadata"]["name"].startswith(template_name_prefix)
)
cloudw marked this conversation as resolved.
Show resolved Hide resolved
]
if not templates:
raise AIPException(
f"No workflow template found with constraints "
f"project_name={project_name}, branch_name={branch_name}, template_name_prefix={template_name_prefix}"
)
# Sort by creation timestamp to get the latest template.
templates.sort(
key=lambda template: template["metadata"]["creationTimestamp"], reverse=True
)
template_name = templates[1]["metadata"]["name"]
cloudw marked this conversation as resolved.
Show resolved Hide resolved
logger.info(
f"Found {len(templates)} WorkflowTemplates. Using latest workflow template: {template_name}"
)
return template_name


def delete_argo_workflow(
kubernetes_namespace: str,
template_name: str,
Expand Down
Loading