Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload Flytekit runtime metrics to S3 #1683

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ def _dispatch_execute(
ctx.file_access.put_data(ctx.execution_state.engine_dir, output_prefix, is_multipart=True)
logger.info(f"Engine folder written successfully to the output prefix {output_prefix}")

# Upload the flytekit running time metrics to remote storage
utils._output_span()
# If deck is not disabled, upload the deck.html to remote storage
if not task_def.disable_deck:
_output_deck(task_def.name.split(".")[-1], ctx.user_space_params)

Expand Down
1 change: 0 additions & 1 deletion flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
IgnoreOutputs

"""

import collections
import datetime
from abc import abstractmethod
Expand Down
2 changes: 2 additions & 0 deletions flytekit/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
OUTPUT_FILE_NAME = "outputs.pb"
FUTURES_FILE_NAME = "futures.pb"
ERROR_FILE_NAME = "error.pb"
SPAN_FILE_NAME = "span.pb"
DECK_FILE_NAME = "deck.html"


class SdkTaskType(object):
Expand Down
24 changes: 24 additions & 0 deletions flytekit/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, cast

from flyteidl.core import tasks_pb2 as _core_task
from flyteidl.core.metrics_pb2 import Span

from flytekit.core import constants as _constants
from flytekit.core.pod_template import PodTemplate
from flytekit.loggers import logger

Expand Down Expand Up @@ -331,3 +333,25 @@ def __exit__(self, exc_type, exc_val, exc_tb):
end_process_time - self._start_process_time,
)
)


def _output_span():
from flytekit.core.context_manager import FlyteContextManager

ctx = FlyteContextManager.current_context()
root_span = Span()

for info in ctx.user_space_params.timeline_deck.time_info: # type: ignore
span = Span()
span.operation_id = info["Name"]
span.start_time.FromDatetime(info["Start"])
span.end_time.FromDatetime(info["Finish"])
root_span.spans.append(span)

local_dir = ctx.file_access.get_random_local_directory()
local_path = f"{local_dir}{_os.sep}{_constants.SPAN_FILE_NAME}"
with open(local_path, "wb") as f:
f.write(root_span.SerializeToString())

remote_path = f"{ctx.user_space_params.output_metadata_prefix}{_os.sep}{_constants.SPAN_FILE_NAME}" # type: ignore
ctx.file_access.put_data(local_path, remote_path)
6 changes: 3 additions & 3 deletions flytekit/deck/deck.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
import typing
from typing import Optional

from flytekit.core import constants as _constants
from flytekit.core.context_manager import ExecutionParameters, ExecutionState, FlyteContext, FlyteContextManager
from flytekit.loggers import logger
from flytekit.tools.interactive import ipython_check

OUTPUT_DIR_JUPYTER_PREFIX = "jupyter"
DECK_FILE_NAME = "deck.html"


class Deck:
Expand Down Expand Up @@ -146,12 +146,12 @@ def _get_deck(
def _output_deck(task_name: str, new_user_params: ExecutionParameters):
ctx = FlyteContext.current_context()
local_dir = ctx.file_access.get_random_local_directory()
local_path = f"{local_dir}{os.sep}{DECK_FILE_NAME}"
local_path = f"{local_dir}{os.sep}{_constants.DECK_FILE_NAME}"
with open(local_path, "w") as f:
f.write(_get_deck(new_user_params, ignore_jupyter=True))
logger.info(f"{task_name} task creates flyte deck html to file://{local_path}")
if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION:
remote_path = f"{new_user_params.output_metadata_prefix}{os.sep}{DECK_FILE_NAME}"
remote_path = f"{new_user_params.output_metadata_prefix}{os.sep}{_constants.DECK_FILE_NAME}"
kwargs: typing.Dict[str, str] = {
"ContentType": "text/html", # For s3
"content_type": "text/html", # For gcs
Expand Down