Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ability to run latest argo workflow template #303

Merged
merged 12 commits into from
Aug 20, 2024
4 changes: 2 additions & 2 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ def from_conf(name, default=None):
# Note: `ARGO_RUN_URL_PREFIX` is the URL prefix for ARGO runs on your ARGO cluster. The prefix includes
# all parts of the URL except the run_id at the end which we append once the run is created.
# For eg, this would look like: "https://<your-kf-cluster-url>/argo-ui/workflows/
ARGO_RUN_URL_PREFIX = from_conf("ARGO_RUN_URL_PREFIX", "")
METAFLOW_RUN_URL_PREFIX = from_conf("METAFLOW_RUN_URL_PREFIX", "")
ARGO_RUN_URL_PREFIX = from_conf("ARGO_RUN_URL_PREFIX", "").rstrip("/")
METAFLOW_RUN_URL_PREFIX = from_conf("METAFLOW_RUN_URL_PREFIX", "").rstrip("/")
cloudw marked this conversation as resolved.
Show resolved Hide resolved
AIP_MAX_PARALLELISM = int(from_conf("AIP_MAX_PARALLELISM", 10))
AIP_MAX_RUN_CONCURRENCY = int(from_conf("AIP_MAX_RUN_CONCURRENCY", 10))
AIP_SHOW_METAFLOW_UI_URL = bool(from_conf("AIP_SHOW_METAFLOW_UI_URL", False))
Expand Down
10 changes: 4 additions & 6 deletions metaflow/plugins/aip/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
)

from .argo_utils import (
run_argo_workflow,
run_id_to_url,
run_id_to_metaflow_url,
wait_for_argo_run_completion,
delete_argo_workflow,
to_metaflow_run_id,
ArgoHelper,
get_argo_url,
get_metaflow_url,
get_metaflow_run_id,
)

from .exit_handler_decorator import (
Expand Down
5 changes: 2 additions & 3 deletions metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,9 @@ def _create_workflow_yaml(
# Note the name has to follow k8s format.
# self.name is typically CamelCase as it's python class name.
# generateName contains a sanitized version of self.name from aip.compiler
default_workflow_name = workflow["metadata"].pop("generateName").rstrip("-")
cloudw marked this conversation as resolved.
Show resolved Hide resolved
workflow["metadata"]["name"] = (
sanitize_k8s_name(name)
if name
else workflow["metadata"].pop("generateName").rstrip("-")
sanitize_k8s_name(name) if name else default_workflow_name
)

# Service account is added through webhooks.
Expand Down
12 changes: 6 additions & 6 deletions metaflow/plugins/aip/aip_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
check_metadata_service_version,
)
from metaflow.plugins.aip.argo_utils import (
run_id_to_url,
run_id_to_metaflow_url,
to_metaflow_run_id,
get_argo_url,
get_metaflow_url,
get_metaflow_run_id,
)
from metaflow.plugins.aip.aip_decorator import AIPException
from metaflow.plugins.aip.aip_step_init import save_step_environment_variables
Expand Down Expand Up @@ -421,9 +421,9 @@ def _echo_workflow_run(
):
argo_workflow_name = workflow_manifest["metadata"]["name"]
argo_workflow_uid = workflow_manifest["metadata"]["uid"]
metaflow_run_id = to_metaflow_run_id(argo_workflow_uid)
metaflow_ui_url = run_id_to_metaflow_url(flow_name, argo_workflow_uid)
argo_ui_url = run_id_to_url(
metaflow_run_id = get_metaflow_run_id(argo_workflow_uid)
metaflow_ui_url = get_metaflow_url(flow_name, argo_workflow_uid)
argo_ui_url = get_argo_url(
cloudw marked this conversation as resolved.
Show resolved Hide resolved
argo_workflow_name, kubernetes_namespace, argo_workflow_uid
)
obj.echo(f"Metaflow run_id=*{metaflow_run_id}*\n", fg="magenta")
Expand Down
6 changes: 2 additions & 4 deletions metaflow/plugins/aip/aip_exit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from metaflow._vendor import click
import logging

from metaflow.plugins.aip import run_id_to_url
from metaflow.plugins.aip import get_argo_url

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,9 +71,7 @@ def email_notify(send_to):
email_body = get_env("METAFLOW_NOTIFY_EMAIL_BODY", "")
k8s_namespace = get_env("POD_NAMESPACE", "")

argo_ui_url = run_id_to_url(
argo_workflow_name, k8s_namespace, argo_workflow_uid
)
argo_ui_url = get_argo_url(argo_workflow_name, k8s_namespace, argo_workflow_uid)
body = (
f"status = {status} <br/>\n"
f"{argo_ui_url} <br/>\n"
Expand Down
4 changes: 2 additions & 2 deletions metaflow/plugins/aip/aip_udf_exit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from metaflow.decorators import flow_decorators, FlowDecorator
from metaflow.graph import FlowGraph
from metaflow.plugins.aip import run_id_to_url
from metaflow.plugins.aip import get_argo_url

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,7 +44,7 @@ def get_env(name, default=None) -> str:

argo_workflow_name = get_env("MF_ARGO_WORKFLOW_NAME", "")
k8s_namespace = get_env("POD_NAMESPACE", "")
argo_ui_url = run_id_to_url(argo_workflow_name, k8s_namespace, argo_workflow_uid)
argo_ui_url = get_argo_url(argo_workflow_name, k8s_namespace, argo_workflow_uid)

metaflow_configs: Dict[str, str] = json.loads(metaflow_configs_json)
metaflow_configs_new: Dict[str, str] = {
Expand Down
16 changes: 15 additions & 1 deletion metaflow/plugins/aip/argo_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# talebz copied from https://github.com/Netflix/metaflow/blob/master/metaflow/plugins/argo/argo_client.py

import json
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, List

from metaflow.exception import MetaflowException
from metaflow.plugins.aws.eks.kubernetes_client import KubernetesClient
Expand Down Expand Up @@ -83,6 +83,20 @@ def create_workflow_config_map(self, name: str, config_map: Dict[str, Any]):
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def list_workflow_template(self, namespace: Optional[str] = None):
cloudw marked this conversation as resolved.
Show resolved Hide resolved
client = self._client.get()
try:
return client.CustomObjectsApi().list_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=namespace or self._namespace,
plural="workflowtemplates",
)
except client.rest.ApiException as e:
raise ArgoClientException(
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def trigger_workflow_template(self, name: str, parameters: Optional[Dict] = None):
client = self._client.get()
body = {
Expand Down
Loading
Loading