Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ability to run latest argo workflow template #303

Merged
merged 12 commits into from
Aug 20, 2024
10 changes: 4 additions & 6 deletions metaflow/plugins/aip/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
)

from .argo_utils import (
run_argo_workflow,
run_id_to_url,
run_id_to_metaflow_url,
wait_for_argo_run_completion,
delete_argo_workflow,
to_metaflow_run_id,
ArgoHelper,
get_argo_url,
get_metaflow_url,
get_metaflow_run_id,
)

from .exit_handler_decorator import (
Expand Down
12 changes: 6 additions & 6 deletions metaflow/plugins/aip/aip_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
check_metadata_service_version,
)
from metaflow.plugins.aip.argo_utils import (
run_id_to_url,
run_id_to_metaflow_url,
to_metaflow_run_id,
get_argo_url,
get_metaflow_url,
get_metaflow_run_id,
)
from metaflow.plugins.aip.aip_decorator import AIPException
from metaflow.plugins.aip.aip_step_init import save_step_environment_variables
Expand Down Expand Up @@ -421,9 +421,9 @@ def _echo_workflow_run(
):
argo_workflow_name = workflow_manifest["metadata"]["name"]
argo_workflow_uid = workflow_manifest["metadata"]["uid"]
metaflow_run_id = to_metaflow_run_id(argo_workflow_uid)
metaflow_ui_url = run_id_to_metaflow_url(flow_name, argo_workflow_uid)
argo_ui_url = run_id_to_url(
metaflow_run_id = get_metaflow_run_id(argo_workflow_uid)
metaflow_ui_url = get_metaflow_url(flow_name, argo_workflow_uid)
argo_ui_url = get_argo_url(
cloudw marked this conversation as resolved.
Show resolved Hide resolved
argo_workflow_name, kubernetes_namespace, argo_workflow_uid
)
obj.echo(f"Metaflow run_id=*{metaflow_run_id}*\n", fg="magenta")
Expand Down
6 changes: 2 additions & 4 deletions metaflow/plugins/aip/aip_exit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from metaflow._vendor import click
import logging

from metaflow.plugins.aip import run_id_to_url
from metaflow.plugins.aip import get_argo_url

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,9 +71,7 @@ def email_notify(send_to):
email_body = get_env("METAFLOW_NOTIFY_EMAIL_BODY", "")
k8s_namespace = get_env("POD_NAMESPACE", "")

argo_ui_url = run_id_to_url(
argo_workflow_name, k8s_namespace, argo_workflow_uid
)
argo_ui_url = get_argo_url(argo_workflow_name, k8s_namespace, argo_workflow_uid)
body = (
f"status = {status} <br/>\n"
f"{argo_ui_url} <br/>\n"
Expand Down
4 changes: 2 additions & 2 deletions metaflow/plugins/aip/aip_udf_exit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from metaflow.decorators import flow_decorators, FlowDecorator
from metaflow.graph import FlowGraph
from metaflow.plugins.aip import run_id_to_url
from metaflow.plugins.aip import get_argo_url

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,7 +44,7 @@ def get_env(name, default=None) -> str:

argo_workflow_name = get_env("MF_ARGO_WORKFLOW_NAME", "")
k8s_namespace = get_env("POD_NAMESPACE", "")
argo_ui_url = run_id_to_url(argo_workflow_name, k8s_namespace, argo_workflow_uid)
argo_ui_url = get_argo_url(argo_workflow_name, k8s_namespace, argo_workflow_uid)

metaflow_configs: Dict[str, str] = json.loads(metaflow_configs_json)
metaflow_configs_new: Dict[str, str] = {
Expand Down
16 changes: 15 additions & 1 deletion metaflow/plugins/aip/argo_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# talebz copied from https://github.com/Netflix/metaflow/blob/master/metaflow/plugins/argo/argo_client.py

import json
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, List

from metaflow.exception import MetaflowException
from metaflow.plugins.aws.eks.kubernetes_client import KubernetesClient
Expand Down Expand Up @@ -83,6 +83,20 @@ def create_workflow_config_map(self, name: str, config_map: Dict[str, Any]):
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def list_workflow_template(self, namespace: Optional[str] = None):
cloudw marked this conversation as resolved.
Show resolved Hide resolved
client = self._client.get()
try:
return client.CustomObjectsApi().list_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=namespace or self._namespace,
plural="workflowtemplates",
)
except client.rest.ApiException as e:
raise ArgoClientException(
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def trigger_workflow_template(self, name: str, parameters: Optional[Dict] = None):
client = self._client.get()
body = {
Expand Down
Loading
Loading