diff --git a/docs/index.md b/docs/index.md index 797b323842d..f1755c0c591 100644 --- a/docs/index.md +++ b/docs/index.md @@ -134,11 +134,11 @@ Have questions or need support? The best way to reach us is through Slack: :maxdepth: 1 :hidden: -Introduction -Quickstart guide -Getting started with workflow development -Flyte Fundamentals -Core Use Cases +Introduction +Quickstart guide +Getting started with workflow development +Flyte Fundamentals +Core Use Cases ``` ```{toctree} diff --git a/docs/introduction/core_use_cases/analytics.md b/docs/introduction/core_use_cases/analytics.md new file mode 100644 index 00000000000..b517f92bf91 --- /dev/null +++ b/docs/introduction/core_use_cases/analytics.md @@ -0,0 +1,187 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +kernelspec: + display_name: Python 3 + language: python + name: python3 + +next-page: userguide +next-page-title: User Guide +--- + +(getting_started_analytics)= + +# Analytics + +Flyte is ideal for data cleaning, statistical summarization, and plotting +because with `flytekit` you can leverage the rich Python ecosystem of data +processing and visualization tools. + +## Cleaning Data + +In this example, we're going to analyze some covid vaccination data: + +```{code-cell} ipython3 +import pandas as pd +import plotly +import plotly.graph_objects as go +from flytekit import Deck, task, workflow, Resources + + +@task(requests=Resources(mem="1Gi")) +def clean_data() -> pd.DataFrame: + """Clean the dataset.""" + df = pd.read_csv("https://covid.ourworldindata.org/data/owid-covid-data.csv") + filled_df = ( + df.sort_values(["people_vaccinated"], ascending=False) + .groupby("location") + .first() + .reset_index() + )[["location", "people_vaccinated", "population", "date"]] + return filled_df +``` + +As you can see, we're using `pandas` for data processing, and in the task +below we use `plotly` to create a choropleth map of the percent of a country's +population that has received at least one COVID-19 vaccination. + +## Rendering Plots + +We can use {ref}`Flyte Decks ` for rendering a static HTML report +of the map. In this case, we normalize the `people_vaccinated` by the +`population` count of each country: + +```{code-cell} ipython3 +@task(disable_deck=False) +def plot(df: pd.DataFrame): + """Render a Choropleth map.""" + df["text"] = df["location"] + "
" + "Last updated on: " + df["date"] + fig = go.Figure( + data=go.Choropleth( + locations=df["location"], + z=df["people_vaccinated"].astype(float) / df["population"].astype(float), + text=df["text"], + locationmode="country names", + colorscale="Blues", + autocolorscale=False, + reversescale=False, + marker_line_color="darkgray", + marker_line_width=0.5, + zmax=1, + zmin=0, + ) + ) + + fig.update_layout( + title_text=( + "Percent population with at least one dose of COVID-19 vaccine" + ), + geo_scope="world", + geo=dict( + showframe=False, showcoastlines=False, projection_type="equirectangular" + ), + ) + Deck("Choropleth Map", plotly.io.to_html(fig)) + + +@workflow +def analytics_workflow(): + """Prepare a data analytics workflow.""" + plot(df=clean_data()) +``` + +Running this workflow, we get an interative plot, courtesy of `plotly`: + +```{code-cell} ipython3 +--- +tags: [remove-input] +--- + +# this is an unrendered cell, used to capture the logs in order to render the +# Flyte Decks directly in the docs. +import logging +import os +import re +from pythonjsonlogger import jsonlogger +from IPython.display import HTML + + +class DeckFilter(logging.Filter): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.formatter = jsonlogger.JsonFormatter( + fmt="%(asctime)s %(name)s %(levelname)s %(message)s" + ) + self.logs = [] + self.deck_files = {} + + def filter(self, record): + patt = "(.+) task creates flyte deck html to (.+/deck.html)" + msg = record.getMessage() + matches = re.match(patt, msg) + + if msg == "Connection error. Skip stats collection.": + return False + + if matches: + task, filepath = matches.group(1), matches.group(2) + self.logs.append(self.formatter.format(record)) + self.deck_files[task] = re.sub("^file://", "", filepath) + return False + + +logger = logging.getLogger("flytekit") +logger.setLevel(20) + +deck_filter = DeckFilter() +logger.addFilter(deck_filter) +``` + +```{code-cell} ipython3 +analytics_workflow() +``` + +```{code-cell} ipython3 +--- +tags: [remove-input] +--- + +import os +import shutil +from pathlib import Path + +def cp_deck(src): + src = Path(src) + target = Path.cwd() / "_flyte_decks" / src.parent.name + target.mkdir(parents=True, exist_ok=True) + shutil.copy(src, target) + return target / "deck.html" + +logger.removeFilter(deck_filter) +HTML(filename=cp_deck(deck_filter.deck_files["plot"])) +``` + +## Custom Flyte Deck Renderers + +You can also create your own {ref}`custom Flyte Deck renderers ` +to visualize data with any plotting/visualization library of your choice, as +long as you can render HTML for the objects of interest. + +```{important} +Prefer other data processing frameworks? Flyte ships with +[Polars](https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-polars), +{ref}`Dask `, {ref}`Modin `, {ref}`Spark `, +[Vaex](https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-vaex), +and [DBT](https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-dbt) +integrations. + +If you need to connect to a database, Flyte provides first-party +support for {ref}`AWS Athena `, {ref}`Google Bigquery `, +{ref}`Snowflake `, {ref}`SQLAlchemy `, and +{ref}`SQLite3 `. +``` diff --git a/docs/introduction/core_use_cases/data_engineering.md b/docs/introduction/core_use_cases/data_engineering.md new file mode 100644 index 00000000000..031ae46fcba --- /dev/null +++ b/docs/introduction/core_use_cases/data_engineering.md @@ -0,0 +1,180 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +kernelspec: + display_name: Python 3 + language: python + name: python3 +--- + +(getting_started_data_engineering)= + +# Data Engineering + +Flyte is well-suited for data engineering use cases, where you can interleave +SQL queries with data processing logic implemented in Python with whichever +data processing tools you prefer. + +In this example, we create an ETL workflow that extracts data from a public +[RNA database](https://rnacentral.org/help/public-database), performs some simple +transforms on the data, and loads it into a CSV file. + +## Extract + +First, we define an `extract_task` task using the +{ref}`flytekitplugins-sqlalchemy ` plugin, which provides an +interface to perform SQL queries via the +{py:class}`~flytekitplugins.sqlalchemy.SQLAlchemyTask` +and {py:class}`~flytekitplugins.sqlalchemy.SQLAlchemyConfig` classes. + +```{code-cell} ipython3 +import os + +import flytekit +import pandas as pd +from flytekit import Resources, kwtypes, task, workflow +from flytekit.types.file import CSVFile +from flytekitplugins.sqlalchemy import SQLAlchemyConfig, SQLAlchemyTask + +DATABASE_URI = ( + "postgresql://reader:NWDMCE5xdipIjRrp@hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs" +) + +extract_task = SQLAlchemyTask( + "extract_rna", + query_template=( + "select len as sequence_length, timestamp from rna " + "where len >= {{ .inputs.min_length }} and len <= {{ .inputs.max_length }} " + "limit {{ .inputs.limit }}" + ), + inputs=kwtypes(min_length=int, max_length=int, limit=int), + output_schema_type=pd.DataFrame, + task_config=SQLAlchemyConfig(uri=DATABASE_URI), +) +``` + +You can format the `query_template` with `{{ .inputs. }}` to +parameterize your query with the `input` keyword type specification, which +maps task argument names to their expected types. + +```{important} +You can request for access to secrets via the `secret_requests` of the +{py:class}`~flytekitplugins.sqlalchemy.SQLAlchemyTask` constructor, then +pass in a `secret_connect_args` argument to the +{py:class}`~flytekitplugins.sqlalchemy.SQLAlchemyConfig` constructor, assuming +that you have connection credentials available in the configured +{ref}`Secrets Management System `, which is K8s by default. +``` + +## Transform + +Next, we parse the raw `timestamp`s and represent the time as separate `date` +and `time` columns. Notice that we can encode the assumptions we have about this +task's resource requirements with the {py:class}`~flytekit.Resources` object. +If those assumptions ever change we can update the resource request here, or +override it at the workflow-level with the {ref}`with_overrides ` method. + +```{code-cell} ipython3 +@task(requests=Resources(mem="700Mi")) +def transform(df: pd.DataFrame) -> pd.DataFrame: + """Add date and time columns; drop timestamp column.""" + timestamp = pd.to_datetime(df["timestamp"]) + df["date"] = timestamp.dt.date + df["time"] = timestamp.dt.time + df.drop("timestamp", axis=1, inplace=True) + return df +``` + +## Load + +Finally, we load the transformed data into its final destination: a CSV file in +blob storage. Flyte has a built-in `CSVFile` type that automatically handles +serializing/deserializing and uploading/downloading the file as it's passed from +one task to the next. All you need to do is write the file to some local location +and pass that location to the `path` argument of `CSVFile`. + +```{code-cell} ipython3 +@task(requests=Resources(mem="700Mi")) +def load(df: pd.DataFrame) -> CSVFile: + """Load the dataframe to a csv file.""" + csv_file = os.path.join(flytekit.current_context().working_directory, "rna_df.csv") + df.to_csv(csv_file, index=False) + return CSVFile(path=csv_file) +``` + +## ETL Workflow + +Putting all the pieces together, we create an `etl_workflow` that produces a +dataset based on the parameters you give it. + +```{code-cell} ipython3 +@workflow +def etl_workflow( + min_length: int = 50, max_length: int = 200, limit: int = 10 +) -> CSVFile: + """Build an extract, transform and load pipeline.""" + return load( + df=transform( + df=extract_task(min_length=min_length, max_length=max_length, limit=limit) + ) + ) +``` + +During local execution, this CSV file lives in a random local +directory, but when the workflow is run on a Flyte cluster, this file lives in +the configured blob store, like S3 or GCS. + +Running this workflow locally, we can access the CSV file and read it into +a `pandas.DataFrame`. + +```{code-cell} ipython3 +csv_file = etl_workflow(limit=5) +pd.read_csv(csv_file) +``` + +## Workflows as Reusable Components + +Because Flyte tasks and workflows are simply functions, we can embed +`etl_workflow` as part of a larger workflow, where it's used to create a +CSV file that's then consumed by downstream tasks or subworkflows: + +```{code-cell} ipython3 +@task +def aggregate(file: CSVFile) -> pd.DataFrame: + data = pd.read_csv(file) + ... # process the data further + + +@task +def plot(data: pd.DataFrame): + ... # create a plot + + +@workflow +def downstream_workflow( + min_length: int = 50, max_length: int = 200, limit: int = 10 +): + """A downstream workflow that visualizes an aggregation of the data.""" + csv_file = etl_workflow( + min_length=min_length, + max_length=max_length, + limit=limit, + ) + return plot(data=aggregate(file=csv_file)) +``` + +```{important} +Prefer other data processing frameworks? Flyte ships with +[Polars](https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-polars), +{ref}`Dask `, {ref}`Modin `, {ref}`Spark `, +[Vaex](https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-vaex), +and [DBT](https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-dbt) +integrations. + +For database connectors, Flyte provides first-party support for {ref}`AWS Athena `, +{ref}`Google Bigquery `, {ref}`Snowflake `, +{ref}`SQLAlchemy `, and {ref}`SQLite3 `. +``` diff --git a/docs/introduction/core_use_cases/index.md b/docs/introduction/core_use_cases/index.md new file mode 100644 index 00000000000..a2fc0786895 --- /dev/null +++ b/docs/introduction/core_use_cases/index.md @@ -0,0 +1,47 @@ +--- +next-page: getting_started/data_engineering +next-page-title: Data Engineering +--- + +(getting_started_core_use_cases)= + +# Core Use Cases + +This section of the **Getting Started** documentation will take you through the +core use cases that Flyte is designed for. Within the context of these guides, +we're going to assume that the discipline of data science can be broken down into +at least three specializations: data engineering, machine learning (or +statistical modeling more broadly), and analytics. + +The purpose of these guides is to provide you, the data and ML practitioner, +some practical and simple examples of how Flyte can help you in your daily +practice. + +```{list-table} +:header-rows: 0 +:widths: 10 30 + +* - {doc}`🛠 Data Engineering ` + - Create an ETL workflow for processing data with SQLAlchemy and Pandas. +* - {doc}`🤖 Machine Learning ` + - Train a classifier with Scikit-Learn and Pandas. +* - {doc}`📈 Analytics ` + - Develop a data cleaning and plotting pipeline with Plotly and Pandas. +``` + +```{admonition} Learn more +:class: important + +Check out more examples in the {ref}`Tutorials ` section, which +includes examples of Flyte in specific domains like +{ref}`bioinformatics `. +``` + +```{toctree} +:maxdepth: -1 +:hidden: + +data_engineering +machine_learning +analytics +``` diff --git a/docs/introduction/core_use_cases/machine_learning.md b/docs/introduction/core_use_cases/machine_learning.md new file mode 100644 index 00000000000..c5b28887c5f --- /dev/null +++ b/docs/introduction/core_use_cases/machine_learning.md @@ -0,0 +1,131 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +kernelspec: + display_name: Python 3 + language: python + name: python3 +--- + +(getting_started_machine_learning)= + +# Machine Learning + +Flyte can handle a full spectrum of machine learning workloads, from +training small models to gpu-accelerated deep learning and hyperparameter +optimization. + +## Getting the Data + +In this simple example, we train a binary classification model on the +[wine dataset](https://scikit-learn.org/stable/datasets/toy_dataset.html#wine-dataset) +that's available through the `scikit-learn` package: + +```{code-cell} ipython3 +import pandas as pd +from flytekit import Resources, task, workflow +from sklearn.datasets import load_wine +from sklearn.linear_model import LogisticRegression + +import flytekit.extras.sklearn + + +@task(requests=Resources(mem="500Mi")) +def get_data() -> pd.DataFrame: + """Get the wine dataset.""" + return load_wine(as_frame=True).frame +``` + +## Define a Training Workflow + +Then, we define `process_data` and `train_model` tasks along with a +`training_workflow` to put all the pieces together for a model-training +pipeline. + +```{code-cell} ipython3 +@task +def process_data(data: pd.DataFrame) -> pd.DataFrame: + """Simplify the task from a 3-class to a binary classification problem.""" + return data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1)) + + +@task +def train_model(data: pd.DataFrame, hyperparameters: dict) -> LogisticRegression: + """Train a model on the wine dataset.""" + features = data.drop("target", axis="columns") + target = data["target"] + return LogisticRegression(max_iter=5000, **hyperparameters).fit(features, target) + + +@workflow +def training_workflow(hyperparameters: dict) -> LogisticRegression: + """Put all of the steps together into a single workflow.""" + data = get_data() + processed_data = process_data(data=data) + return train_model( + data=processed_data, + hyperparameters=hyperparameters, + ) + +``` + +```{important} +Even though you can use a `dict` type to represent the model's hyperparameters, +we recommend using {ref}`dataclasses ` to define a custom +`Hyperparameter` Python object that provides more type information to the Flyte +compiler. For example, Flyte uses this type information to auto-generate +type-safe launch forms on the Flyte UI. Learn more in the +{ref}`Extending Flyte ` guide. +``` + +## Computing Predictions + +Executing this workflow locally, we can call the `model.predict` method to make +sure we can use our newly trained model to make predictions based on some +feature matrix. + +```{code-cell} ipython3 +model = training_workflow(hyperparameters={"C": 0.01}) +X, _ = load_wine(as_frame=True, return_X_y=True) +model.predict(X.sample(10, random_state=41)) +``` + +## Extending your ML Workloads + +There are many ways to extend your workloads: + +```{list-table} +:header-rows: 0 +:widths: 20 30 + +* - **🏔 Vertical Scaling** + - Use the {py:class}`~flytekit.Resources` task keyword argument to request + additional CPUs, GPUs, and/or memory. +* - **🗺 Horizontal Scaling** + - With constructs like {py:func}`~flytekit.dynamic` workflows and + {py:func}`~flytekit.map_task`s, implement gridsearch, random search, + and even [bayesian optimization](https://github.com/flyteorg/flytekit-python-template/tree/main/bayesian-optimization/%7B%7Bcookiecutter.project_name%7D%7D). +* - **🔧 Specialized Tuning Libraries** + - Use the {ref}`Ray Integration ` and leverage tools like + [Ray Tune](https://docs.ray.io/en/latest/tune/index.html) for hyperparameter + optimization, all orchestrated by Flyte as ephemerally-provisioned Ray clusters. +* - **📦 Ephemeral Cluster Resources** + - Use the {ref}`MPI Operator `, {ref}`Sagemaker `, + {ref}`Kubeflow Tensorflow `, {ref}`Kubeflow Pytorch` + and {doc}`more <_tags/DistributedComputing>` to do distributed training. +* - **🔎 Experiment Tracking** + - Auto-capture training logs with the {py:func}`~flytekitplugins.mlflow.mlflow_autolog` + decorator, which can be viewed as Flyte Decks with `@task(disable_decks=False)`. +* - **⏩ Inference Acceleration** + - Serialize your models in ONNX format using the {ref}`ONNX plugin `, which + supports ScikitLearn, TensorFlow, and PyTorch. +``` + +```{admonition} Learn More +:class: important + +See the {ref}`Tutorials ` for more machine learning examples. +``` diff --git a/docs/introduction/flyte_fundamentals/extending_flyte.md b/docs/introduction/flyte_fundamentals/extending_flyte.md new file mode 100644 index 00000000000..7390ac93202 --- /dev/null +++ b/docs/introduction/flyte_fundamentals/extending_flyte.md @@ -0,0 +1,169 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +kernelspec: + display_name: Python 3 + language: python + name: python3 +--- + +(getting_started_extending_flyte)= + +# Extending Flyte + +Once you have a hang of the fundamentals of Flyte, you may find that you want +to use it in ways that aren't supported out-of-the-box. Fortunately, Flyte +provides multiple extension points that enable you make it more powerful +for your specific use cases. + +This guide will walk you through two of the many different ways you can extend +the Flyte type system and Flyte tasks. + +(customizing_flyte_types)= + +## Customizing Flyte Types + +Flyte has a {ref}`rich type system ` that automatically +handles the serialization and deserialization of objects so that when you pass +data from one task to the next, you don't have to write a bunch of boilerplate +code. + +However, the types that ship with Flyte or one of Flyte's +{ref}`first-party integrations ` may not fulfill your needs. In +this case, you'll need to create your own. + +The easiest way to do with is with the {py:mod}`dataclasses` module, which +let you compose several Flyte-supported types into a single object. For +example, suppose you want to support a coordinates data type with arbitrary +metadata: + +```{code-cell} ipython3 +import typing + +from dataclasses import dataclass +from mashumaro.mixins.json import DataClassJSONMixin + +@dataclass +class Coordinate(DataClassJSONMixin): + """A custom type for coordinates with metadata attached.""" + x: float + y: float + metadata: typing.Dict[str, float] +``` + +You can then use this as a new type in your tasks and workflows: + +```{code-cell} ipython3 +from flytekit import task + +@task +def generate_coordinates(num: int) -> typing.List[Coordinate]: + """Generate some coordinates.""" + ... + +@task +def subset_coordinates( + coordinates: typing.List[Coordinate], x_min: float, x_max: float, +) -> typing.List[Coordinate]: + """Select coordinates within a certain x-axis range.""" + ... +``` + +```{important} +The limitation of using the approach above is that you can only compose types +that are already supported by Flyte. + +To create entirely new types, you'll need to use the +{py:class}`~flytekit.extend.TypeTransformer` interface to explicitly handle +the way in which the object is (a) serialized as a task output and (b) +deserialized when passed into a task as an input. + +See the {ref}`User Guide ` for an example of a custom +type. +``` + +## Customizing Flyte Tasks + +The easiest way to extend Flyte tasks is to use Python decorators. Since Flyte +tasks are simply functions, you can wrap the task function in a custom +decorator _before_ wrapping the entire function in the `@task` decorator. + +For example, if we want to do something before and after the actual task function +is invoked, we can do the following: + +```{code-cell} ipython3 +from functools import partial, wraps + +def decorator(fn): + + @wraps(fn) + def wrapper(*args, **kwargs): + print("do something before") + out = fn(*args, **kwargs) + print("do something after") + return out + + return wrapper +``` + +Then, making sure `@task` is the outermost decorator, we can modify the +behavior of the task: + +```{code-cell} ipython3 +@task +@decorator +def add_one(x: int) -> int: + return x + 1 + +add_one(x=10) +``` + +This approach allows you to call out to some external service or library before +and after your task function body is executed. For example, this pattern is used +by the MLFlow integration via the {py:func}`~flytekitplugins.mlflow.mlflow_autolog` +decorator to auto-log metrics during a model-training task. + +```{note} +You can stack multiple decorators on top of each other. Learn more in the +{ref}`User Guide `. + +Flyte also supports a setup-teardown pattern at the workflow level, which +allows you to enable/disable services at the beginning/end of your workflows. +See the {ref}`User Guide ` for more details. +``` + +## The Plugin Hierarchy of Needs + +The decorator approach is great for many surface-level use cases, but there are +many more ways to customize Flyte tasks: + +```{list-table} +:header-rows: 0 +:widths: 10 30 + +* - {ref}`Pre-built Container Task Plugins ` + - Task extensions that use pre-built containers, useful for tasks that don't + require user-defined code and simply rely on input parameters. +* - {ref}`User Container Task Plugins ` + - Task extensions that require user-built containers when the task also + requires user-defined code. +* - {ref}`Raw Container Tasks ` + - These tasks can be implemented in other programming languages like R, + Julia, etc. Useful for leveraging highly optimized domain-specific libraries + in other languages outside of the `flytekit` SDK language. +* - {ref}`Backend Plugins ` + - These tasks plugins require implementing a backend plugin to leverage + external services like Sagemaker, Snowflake, BigQuery, etc. +``` + +## What's Next? + +Congratulations! 🎉 You've just completed the Flyte Fundamentals tour. + +The final section in the getting started section of the docs will provide you +with some {ref}`core use cases ` for implementing +your first workflows, whether you're a data scientist, data analyst, data engineer, +or machine learning engineer. diff --git a/docs/introduction/flyte_fundamentals/index.md b/docs/introduction/flyte_fundamentals/index.md new file mode 100644 index 00000000000..c1cadf41656 --- /dev/null +++ b/docs/introduction/flyte_fundamentals/index.md @@ -0,0 +1,58 @@ +--- +prev-page: getting_started/creating_and_running_a_flyte_launch_plan +prev-page-title: Creating and running a Flyte Launch Plan +next-page: getting_started/tasks_and_workflows +next-page-title: Tasks, Workflows and LaunchPlans +--- + +(getting_started_fundamentals)= + +# Flyte Fundamentals + +This section of the **Getting Started** documentation will take you through the +fundamental concepts of Flyte: tasks, workflows, and launch plans. + +You'll learn about the full development lifecycle of creating a project, +registering workflows, and running them on a demo Flyte cluster. These +guides will also walk you through how to visualize artifacts associated with +tasks, optimize them for scale and performance, and extend Flyte for your own +use cases. + +```{list-table} +:header-rows: 0 +:widths: 20 30 + +* - {doc}`🔀 Tasks, Workflows and LaunchPlans ` + - Create tasks as building blocks, compose them into workflows, and schedule + them with launchplans. +* - {doc}`🗄 Registering Workflows ` + - Develop and deploy workflows to a local Flyte demo cluster. +* - {doc}`⏱ Running and Scheduling Workflows ` + - Execute workflows programmatically and schedule them as cron jobs. +* - {doc}`📊 Visualizing Task Input and Output ` + - Create rich, customizable static reports for increased visibility into tasks. +* - {doc}`🏎 Optimizing Tasks ` + - Make tasks scalable, performant, and robust to unexpected failures. +* - {doc}`🔌 Extending Flyte ` + - Customize Flyte types and tasks to fit your needs. +``` + +```{admonition} Learn more +:class: important + +For a comprehensive view of all of Flyte's functionality, see the +{ref}`User Guide `, and to learn how to deploy a production Flyte +cluster, see the {ref}`Deployment Guide `. +``` + +```{toctree} +:maxdepth: -1 +:hidden: + +tasks_and_workflows +package_register +run_schedule +visualizing_task_input_and_output +optimizing_tasks +extending_flyte +``` diff --git a/docs/introduction/flyte_fundamentals/optimizing_tasks.md b/docs/introduction/flyte_fundamentals/optimizing_tasks.md new file mode 100644 index 00000000000..6a2632c74ef --- /dev/null +++ b/docs/introduction/flyte_fundamentals/optimizing_tasks.md @@ -0,0 +1,265 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +kernelspec: + display_name: Python 3 + language: python + name: python3 +--- + +(getting_started_optimizing_tasks)= + +# Optimizing Tasks + +There are many ways to optimize your tasks and workflows in Flyte, and this guide +will take you through just some the common methods for doing so. + +## Caching + +Caching allows you to avoid re-running potentially expensive tasks. You can +specify which tasks to cache with the `cache` and `cache_version` arguments: + +```{code-cell} ipython3 +from typing import List +from flytekit import task, workflow + + +@task(cache=True, cache_version="1") +def compute_mean(data: List[float]) -> float: + return sum(data) / len(data) + +@workflow +def wf(data: List[float]) -> float: + return compute_mean(data=data) +``` + +Caching works both locally and on a Flyte backend. + +```{code-cell} ipython3 +%timeit -n 1 -r 1 wf(data=[float(x) for x in range(100_000)]) +``` + +```{code-cell} ipython3 +%timeit -n 1 -r 1 wf(data=[float(x) for x in range(100_000)]) +``` + +As you can see, the second call to the `wf` workflow takes less time because +Flyte simply hits the cache to obtain the result. + +```{note} +For file-like data types like {py:class}`flytekit.types.file.FlyteFile` and offloaded +data types like `pandas.DataFrame` objects, you can provide a hash function that +represents the cache key. Learn more in the {ref}`User Guide `. +``` + +## Retries + +Flyte also allows you to automatically retry failing tasks in the case of +system-level or catastrophic errors that may arise from issues that don't have +anything to do with user-defined code, like network issues and data center +outages. + +The following version of the `compute_mean` task simulates these kinds of +errors by randomly throwing a `RuntimeError` 5% of the time: + +```{code-cell} ipython3 +import random + +@task(retries=3) +def compute_mean(data: List[float]) -> float: + if random() < 0.05: + raise RuntimeError("Something bad happened 🔥") + return sum(data) / len(data) +``` + +```{note} +Retries only take effect when running a task on a Flyte cluster. +``` + +## Timeouts + +To protect against zombie tasks that hang due to system-level issues, you +can supply the `timeout` argument to the `@task` decorator to make sure that +problematic tasks adhere to a maximum runtime. + +In this example, we make sure that the task is terminated after it's been +running for more that one hour. + +```{code-cell} ipython3 +from datetime import timedelta + +@task(timeout=timedelta(hours=1)) +def compute_mean(data: List[float]) -> float: + return sum(data) / len(data) +``` + +Notice that the `timeout` argument takes a built-in Python +{py:class}`~datetime.timedelta` object. + +## Map Tasks + +If you need to parallelize a task, you can use the {py:func}`~flytekit.map_task` +construct. A mappable task is one that takes in a single argument and produces +some output. + +In this example, we partition our data into chunks so that we can horizontally +scale our workload: + +```{code-cell} ipython3 +import math +from typing import Tuple + +from flytekit import map_task + + +@task +def sum_and_length(data: List[float]) -> List[float]: + """Return the sum and length of a dataset of numbers.""" + return [sum(data), float(len(data))] + + +@task +def prepare_partitions(data: List[float], n_partitions: int) -> List[List[float]]: + """Create partitions from the full dataset.""" + size = math.ceil(len(data) / n_partitions) + return [data[size * i: size * (i + 1)] for i in range(n_partitions)] + +@task +def reduce(results: List[List[float]]) -> float: + """Combine results from the map task.""" + total, length = 0.0, 0.0 + for sub_total, sub_length in results: + total += sub_total + length += sub_length + return total / length + + +@workflow +def parallelized_compute_mean(data: List[float], n_partitions: int = 10) -> float: + """An embarrassingly parallel implementation to compute the mean from data.""" + partitioned_data = prepare_partitions(data=data, n_partitions=n_partitions) + + # use map_task to apply the sum_and_length task to the partitions + results = map_task(sum_and_length)(data=partitioned_data) + return reduce(results=results) + + +parallelized_compute_mean(data=[float(x) for x in range(10_000)]) +``` + +## Resource Allocation + +As one of the core features of Flyte, workflows can be composed of tasks that +potentially have heterogeneous resource requirements. You can express this with +the {py:class}`~flytekit.Resources` object: + +```{code-cell} ipython3 +from flytekit import Resources + + +@task(requests=Resources(cpu="2", mem="100Mi")) +def light_task() -> float: + ... + + +@task(requests=Resources(cpu="16", mem="16Gi")) +def heavy_task() -> float: + ... +``` + +## Multi-image Workflows + +In addition to task-level resource configuration, you can +also specify different images per task. This is particularly useful if some +tasks in your workflow have a different set of dependencies (e.g. require CUDA +to be installed for model training) where most of the other tasks can use +another image. + +In this example we specify two tasks: one that uses CPUs and another that uses +GPUs. For the former task, we use the default image that ships with `flytekit` +and for the latter task, we specify a pre-built image that the core Flyte team +maintains that enables distributed training with the +{ref}`Kubeflow Pytorch integration `. + +```{code-cell} ipython3 +import numpy as np +import torch.nn as nn + +@task( + requests=Resources(cpu="2", mem="16Gi"), + container_image="ghcr.io/flyteorg/flytekit:py3.9-latest", +) +def get_data() -> Tuple[np.ndarray, np.ndarray]: + ... # get dataset as numpy ndarrays + + +@task( + requests=Resources(cpu="4", gpu="1", mem="16Gi"), + container_image="ghcr.io/flyteorg/flytecookbook:kfpytorch-latest", +) +def train_model(features: np.ndarray, target: np.ndarray) -> nn.Module: + ... # train a model using gpus +``` + +These tasks assume that we're going to use the {ref}`pyflyte register ` +command to register these tasks, since these static images will not contain +the code that we defined above. Using `pyflyte register` ensures that `get_data` +and `train_model` are zipped up and Flyte has access to it when they're executed +on a Flyte backend. + +```{important} +You can also configure the container images dynamically. See the +{ref}`User Guide ` for more details. +``` + + +## Declarative Infrastructure + +Finally, staying with the theme of Flyte's ability to handle heterogeneous +workloads at the most granular level, you can configure tasks to leverage +third-party infrastructure via the extensible task plugin system. + +As we saw with the `train_model` task example, we're using a CUDA-enabled image, +but in order to do distributed training, we'll have to leverage the +{py:class}`~flytekitplugins.kfpytorch.PyTorch` plugin: + +```{code-cell} ipython3 +from flytekitplugins.kfpytorch import PyTorch + +@task( + task_config=PyTorch(num_workers=2), + requests=Resources(cpu="2", gpu="1", mem="8Gi"), + limits=Resources(cpu="4", gpu="2", mem="16Gi"), + container_image="ghcr.io/flyteorg/flytecookbook:kfpytorch-latest", +) +def train_model(features: np.ndarray, target: np.ndarray) -> nn.Module: + ... # train a model using gpus + +``` + +This plugin highlights one of the most powerful abilities that you gain with +Flyte: the ability to *declaratively specify infrastructure requirements* +at the most granular level of your workflow! + +When this task is executed on a Flyte cluster, it automatically provisions all of +the resources that you need. In this case, that need is distributed +training, but Flyte also provides integrations for {ref}`Spark `, +{ref}`Ray `, {ref}`MPI `, {ref}`Sagemaker `, +{ref}`Snowflake `, and more. + +Even though Flyte itself is a powerful compute engine and orchestrator for +data engineering, machine learning, and analytics, perhaps you have existing +code that leverages other platforms. Flyte recognizes the pain of migrating code, +so its plugin system enables you to call out to third-party services and +infrastructure when needed so that you can embed existing workloads into the +Flyte programming paradigm. + +## What's Next? + +In this guide, you learned the various ways in which you can optimize your +tasks and workflows to make them more scalable and robust. In the final stop of +the Flyte Fundamentals tour, we'll see how to extend Flyte in the cases where +the built-in functionality doesn't quite fit your needs. diff --git a/docs/introduction/flyte_fundamentals/package_register.md b/docs/introduction/flyte_fundamentals/package_register.md new file mode 100644 index 00000000000..81d280618ed --- /dev/null +++ b/docs/introduction/flyte_fundamentals/package_register.md @@ -0,0 +1,370 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +kernelspec: + display_name: Python 3 + language: python + name: python3 +--- + +(getting_started_package_register)= + +# Registering Workflows + +In this guide, you'll learn how to package and register your tasks and +workflows to a Flyte cluster. This will enable you to scale your workloads with +larger memory and compute requirements, schedule your workflows to run on a +pre-defined cadence, and leverage the Flyte backend plugins like Spark. + +```{admonition} Prerequisites +:class: important + +This guide assumes that you: + +- Have a local Flyte cluster running with `flytectl demo start` as described in {ref}`"Running workflows locally" ` guide. +- Followed the {doc}`Creating a Flyte project ` + guide to create a minimal Flyte project. +``` + +## Custom Dependencies + +If you have custom Python dependencies, update the `requirements.txt` file that +ships with the {ref}`project template ` +and those changes will be incorporated into the Docker image. + +You can also update the Dockerfile (if using a Dockerfile) or the [ImageSpec configuration](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/customizing_dependencies/image_spec.html#image-spec-example) if you +want to use a different base image or if the additional Python dependencies +require installing binaries or packages from other languages. + +# Registration Patterns + +There are different methods of registering your workflows to a Flyte cluster +where each method fulfills a particular use case during the workflow development +cycle. In this section, we'll cover the commands you need to fulfill the +following use cases: + +1. Iterating on a single workflow script. +2. Iterating on a Flyte project with multiple task/workflow modules. +3. Deploying your workflows to a production environment. + +The following diagram provides a summarized view of the different registration patterns: + +![](https://raw.githubusercontent.com/flyteorg/static-resources/main/flytesnacks/getting_started/flyte-registration-patterns.png) + + +(getting_started_register_pyflyte_run)= + +## Iterating on a Single Task or Workflow + +The quickest way to register a task or workflow to a Flyte cluster is with the +`pyflyte run` CLI command. Assuming that you're inside the `my_project` directory +that we created in {doc}`Creating a Flyte project `, +you can invoke it like so: + +```{prompt} bash $ +pyflyte run --remote workflows/example.py wf --name "Gilgamesh" +``` + +````{div} shadow p-3 mb-8 rounded + +**Expected Output:** A URL to the workflow execution on your demo Flyte cluster: + +```{code-block} +Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/ to see execution in the console. +``` + +Where ```` is a unique identifier for the workflow execution. + +```` + +`pyflyte run` will not only register the specified workflow `wf`, it will also +run it with the supplied arguments. As you can see from the expected output, you +can visit the link to the Flyte console to see the progress of your running +execution. + +```{note} +`pyflyte run` supports Flyte workflows that import any other user-defined modules that +contain additional tasks or workflows. +``` + +(getting_started_pyflyte_register)= + +### Iterating on a Flyte Project + +One of Flyte's benefits is its functional design, which means that you can +import and reuse tasks and workflows like you would Python functions when you +organize your code into meaningful sets of modules and subpackages. + +When you move past a single script that contains your workflows, use the +`pyflyte register` command to register all the tasks and workflows contained +in the specified directory or file: + +```{prompt} bash $ +pyflyte register workflows +``` + +````{div} shadow p-3 mb-8 rounded + +**Expected Output:** + +```{code-block} bash +Successfully serialized 4 flyte objects +Found and serialized 4 entities + Registering workflows.example.say_hello....done, TASK with version sdYMF0jAkhDh_KA1IMAFYA==. + Registering workflows.example.greeting_length....done, TASK with version sdYMF0jAkhDh_KA1IMAFYA==. + Registering workflows.example.wf....done, WORKFLOW with version sdYMF0jAkhDh_KA1IMAFYA==. + Registering workflows.example.wf....done, LAUNCH_PLAN with version sdYMF0jAkhDh_KA1IMAFYA==. +Successfully registered 4 entities +``` + +```` + +By default, `pyflyte register` uses a [default Docker image](https://ghcr.io/flyteorg/flytekit) +that's maintained by the Flyte team, but you can use your own Docker image by +passing in the `--image` flag. + +For example, assuming that you want to use the latest Python 3.9 flytekit image, +the explicit equivalent to the default image value would be something like: + +```{prompt} bash $ +pyflyte register workflows --image ghcr.io/flyteorg/flytekit:py3.9-latest +``` + +````{note} +You can also specify multiple workflow directories, like: + +```{prompt} bash $ +pyflyte register ... +``` + +This is useful in cases where you want to register two different Flyte projects +that you maintain in a single place. + +```` + +Once you've successfully registered your workflows, you can execute them by +going to the Flyte console. If you're using a local Flyte demo cluster, you can +go to the browser at `localhost:30080/console` and do the following: + + - Navigate to the **flytesnacks** > **development** domain. +- Click on the **Workflows** section of the left-hand sidebar. +- Click on the **workflows.example.wf** card on the workflows list. +- Click on the **Launch Workflow** button on the top-right corner. +- Fill in an input **name** and click on the **Launch** button. + +![](https://raw.githubusercontent.com/flyteorg/static-resources/main/flytesnacks/getting_started/getting-started-flyte-ui.png) + +```{note} +In the next guide you'll learn about how to run your workflows programmatically. +``` + +#### Fast Registration + +`pyflyte register` packages up your code through a mechanism called +**fast registration**. Fast registration is useful when you already have a +container image that's hosted in your container registry of choice and you change +your workflow/task code _without any changes in your system-level/Python +dependencies_. At a high level, fast registration: + +1. 📦 **Packages** and zips up the directory/file that you specify as the argument to + `pyflyte register`, along with any files in the root directory of your + project. The result of this is a tarball that is packaged into a `.tar.gz` + file, which also includes the serialized task (in `protobuf` format) and workflow specifications + defined in your workflow code. +2. 🚢 **Registers** the Flyte package to the specified Flyte cluster and uploads the + tarball containing the user-defined code into the configured blob store + (e.g. `s3`, `gcs`). + +At workflow execution time, Flyte knows to automatically inject the zipped up +task/workflow code into the running container, thereby overriding the user-defined +tasks/workflows that were originally baked into the image. + +```{admonition} Ignoring files during fast registration +:class: important + +In step (1) of the fast registration process, by default Flyte will package up +all user-defined code at the root of your project. In some cases, your project +directory may contain datasets, model files, and other potentially large +artifacts that you want to exclude from the tarball. + +You can do so by specifying these files in a `.gitignore` or `.dockerignore` +file in the root directory of your project. +``` + +### Productionizing your Workflows + +Flyte's core design decision is to make workflows reproducible and repeatable. +One way it achieves this is by providing a way for you to bake-in user-defined +workflows and all of their dependencies into a Docker container. + +The third method of registering your workflows uses two commands: + +- `pyflyte package`: packages your tasks and workflows into `protobuf` format. +- `flytectl register`: registers the Flyte package to the configured cluster. + +This is the production-grade registration flow that we recommend because this +method ensures that the workflows are fully containerized, which ensures that +the system- and Python-level dependencies along with your workflow source code +are immutable. + +(containerizing_your_project)= + +#### Containerizing your Project + +Flyte relies on OCI-compatible containers to package up your code and third-party +dependencies. When you invoke `pyflyte init`, the resulting template project +ships with a `docker_build.sh` script that you can use to build and tag a +container according to the recommended practice: + +```{prompt} bash $ +./docker_build.sh +``` + +```{important} +By default, the `docker_build.sh` script: + +- Uses the `PROJECT_NAME` specified in the `pyflyte init` command, which in + this case is `my_project`. +- Will not use any remote registry. +- Uses the git sha to version your tasks and workflows. +``` + +You can override the default values with the following flags: + +```{prompt} bash $ +./docker_build.sh -p -r -v +``` + +For example, if you want to push your Docker image to Github's container +registry you can specify the `-r ghcr.io` flag. + +```{note} +The `docker_build.sh` script is purely for convenience; you can always roll +your own way of building Docker containers. +``` + +Once you've built the image, you can push it to the specified registry. For +example, if you're using Github container registry, do the following: + +```{prompt} bash $ +docker login ghcr.io +docker push +``` + +```{admonition} Pulling Private Images +:class: important + +For many projects it's convenient to make your images public, but in the case +that you're building proprietary images or images that may contain sensitive +metadata/configuration, it's more secure if they're private. + +Learn more about how to pull private image in the {ref}`User Guide `. +``` + +#### Package your Project with `pyflyte package` + +You can package your project with the `pyflyte package` command like so: + +```{prompt} bash $ +pyflyte --pkgs workflows package --image ghcr.io/flyteorg/flytekit:py3.9-latest +``` + +````{div} shadow p-3 mb-8 rounded + +**Expected Output:** + +```{code-block} bash +Successfully serialized 4 flyte objects + Packaging workflows.example.say_hello -> 0_workflows.example.say_hello_1.pb + Packaging workflows.example.greeting_length -> 1_workflows.example.greeting_length_1.pb + Packaging workflows.example.wf -> 2_workflows.example.wf_2.pb + Packaging workflows.example.wf -> 3_workflows.example.wf_3.pb +Successfully packaged 4 flyte objects into /Users/nielsbantilan/sandbox/my_project/flyte-package.tgz +``` + +```` + +This will create a portable package `flyte-package.tgz` containing all the Flyte +entities compiled as protobuf files that you can register with multiple Flyte +clusters. + +````{note} +Like `pyflyte register`, can also specify multiple workflow directories, like: + +```{prompt} bash $ +pyflyte --pkgs --pkgs package ... +``` + +This is useful in cases where you want to register two different Flyte projects +that you maintain in a single place. + +```` + +#### Register with `flytectl register` + +Finally, register your tasks and workflows with `flytectl register files`: + +```{prompt} bash $ +flytectl register files \ + --project flytesnacks \ + --domain development \ + --archive flyte-package.tgz \ + --version "$(git rev-parse HEAD)" +``` + +Let's break down what each flag is doing here: + +- `--project`: A project is a Flyte concept for built-in multi-tenancy so that + you can logically group tasks and workflows. The Flyte demo cluster ships with + a default project called `flytesnacks`. +- `--domain`: A domain enables workflows to be executed in different environment, + with separate resource isolation and feature configurations. The Flyte demo + cluster ships with three default domains: `development`, `staging`, and + `production`. +- `--archive`: This argument allows you to pass in a package file, which in + this case is `flyte-package.tgz`. +- `--version`: This is a version string that can be any string, but we recommend + using the git sha in general, especially in production use cases. + +### Using `pyflyte register` versus `pyflyte package` + `flytectl register` + +As a rule of thumb, `pyflyte register` works well in a single Flyte cluster where +you are iterating quickly on your task/workflow code. + +On the other hand, `pyflyte package` and `flytectl register` is appropriate if +you're: + +- Working with multiple Flyte clusters since it uses a portable package +- Deploying workflows to a production context +- Testing your Flyte workflows in your CI/CD infrastructure. + +```{admonition} Programmatic Python API +:class: important + +You can also perform the equivalent of the three methods of registration using +a {py:class}`~flytekit.remote.remote.FlyteRemote` object. You can learn more +about how to do this {ref}`here `. +``` + +## CI/CD with Flyte and GitHub Actions + +You can use any of the commands we learned in this guide to register, execute, +or test Flyte workflows in your CI/CD process. The core Flyte team maintains +two GitHub actions that facilitates this: + +- [`flyte-setup-action`](https://github.com/unionai-oss/flytectl-setup-action): + This action handles the installation of `flytectl` in your action runner. +- [`flyte-register-action`](https://github.com/unionai-oss/flyte-register-action): + This action uses `flytectl register` under the hood to handle registration + of Flyte packages, for example, the `.tgz` archives that are created by + `pyflyte package`. + +## What's Next? + +In this guide, you learned about the Flyte demo cluster, Flyte configuration, and +the different registration patterns you can leverage during the workflow +development lifecycle. In the next guide, we'll learn how to run and schedule +workflows programmatically. diff --git a/docs/introduction/flyte_fundamentals/run_schedule.md b/docs/introduction/flyte_fundamentals/run_schedule.md new file mode 100644 index 00000000000..96ef82909f4 --- /dev/null +++ b/docs/introduction/flyte_fundamentals/run_schedule.md @@ -0,0 +1,350 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +kernelspec: + display_name: Python 3 + language: python + name: python3 +--- + +(getting_started_run_and_schedule)= + +# Running and Scheduling Workflows + +Flyte supports the development and debugging of tasks and workflows in a local +setting, which increases the iteration speed of building out your data- +or machine-learning-driven applications. + +Running your workflows locally is great for the initial stages of testing, but +what if you need to make sure that they run as intended on a Flyte cluster? +What if you need to run them on a regular cadence? + +In this guide we'll cover how to run and schedule workflows for both development +and production use cases. + +```{admonition} Prerequisites +:class: important + +This guide assumes that you've completed the previous guides for +{doc}`Creating a Flyte project ` and +{ref}`Packaging and Registering Workflows `. +``` + +## Create a `FlyteRemote` Object + +In {doc}`"Running a workflow locally" `, you saw +how to run Flyte workflows with `pyflyte run` in the case that you're working +with standalone scripts. + +Once you're working with larger projects where you've registered workflows +to a Flyte cluster, we recommend using the {py:class}`~flytekit.remote.remote.FlyteRemote` +client to run workflows from a Python runtime. First, let's create a `FlyteRemote` +object: + +```{code-cell} ipython3 +:tags: [remove-output] + +from flytekit.configuration import Config +from flytekit.remote import FlyteRemote + +remote = FlyteRemote( + config=Config.auto(), + default_project="flytesnacks", + default_domain="development", +) +``` + +## Running a Workflow + +You can run workflows using the `FlyteRemote` {py:meth}`~flytekit.remote.remote.FlyteRemote.execute` +method, where you need to pass in a dictionary of `inputs` that adhere to the +interface defined by the workflow. + +`````{tabs} + +````{group-tab} Locally Imported + +If you have access to the `@workflow`-decorated function in your Python runtime +environment, you can import and execute it directly: + +Before execute it directly, you need to register the workflow first. + +```{prompt} bash $ +pyflyte register wf.py +``` + +```{code-block} python + +from workflows.example import wf + +execution = remote.execute( + wf, + inputs={"name": "Kermit"}, +) +``` + +```` + +````{group-tab} Remotely Fetched + +Execute a workflow by fetching a `FlyteWorkflow` object from the remote +**FlyteAdmin** service, which essentially contains the metadata representing a +Flyte workflow that exists on a Flyte cluster backend. + +```{code-block} python +flyte_wf = remote.fetch_workflow(name="workflows.example.wf") +execution = remote.execute(flyte_wf, inputs={"name": "Kermit"}) +``` + +```` + +````` + +```{note} +You can also launch workflows via `flytectl` which you can learn more about in +the {ref}`User Guide `. +``` + +## Running a Launchplan + +Similar to workflows, you can run launch plans with `FlyteRemote`: + +`````{tabs} + +````{group-tab} Locally Imported + +If you have a `LaunchPlan` defined in your Python runtime environment, you can +execute it directly: + +```{code-block} python + +from workflows.example import wf + +launch_plan = LaunchPlan.get_or_create( + wf, name="launch_plan", default_inputs={"name": "Elmo"}, +) + +execution = remote.execute(launch_plan, inputs={}) +``` + +```` + +````{group-tab} Remotely Fetched + +Execute a task by fetching a `FlyteLaunchPlan` object from the remote +**FlyteAdmin** service, which essentially contains the metadata representing a +Flyte task that exists on a Flyte cluster backend. + +This example assumes that you've added a `launch_plan` with some default inputs +to the `example.py` script and registered it to the backend: + +```{code-block} python +flyte_launchplan = remote.fetch_launch_plan(name="workflows.example.launch_plan") +execution = remote.execute(flyte_launchplan, inputs={}) +``` + +```` + +````` + +## Running a Task + +You can also run individual tasks on a Flyte cluster using `FlyteRemote`: + +`````{tabs} + +````{group-tab} Locally Imported + +If you have access to the `@task`-decorated function in your Python runtime +environment, you can import and execute it directly: + +```{code-block} python + +from workflows.example import say_hello + +execution = remote.execute(say_hello, inputs={"name": "Kermit"}) +``` + +```` + +````{group-tab} Remotely Fetched + +Execute a task by fetching a `FlyteWorkflow` object from the remote +**FlyteAdmin** service, which essentially contains the metadata representing a +Flyte task that exists on a Flyte cluster backend. + +```{code-block} python +flyte_task = remote.fetch_task(name="workflows.example.say_hello") +execution = remote.execute(flyte_task, inputs={"name": "Kermit"}) +``` + +```` + +````` + +```{note} +You can also launch tasks via `flytectl`, learn more in the {ref}`User Guide ` +``` + +## Fetching Inputs and Outputs of an Execution + +By default, {py:meth}`FlyteRemote.execute ` +is non-blocking, but you can also pass in `wait=True` to make it synchronously +wait for the task or workflow to complete. + +Print out the Flyte console url corresponding to your execution with: + +```{code-block} python +print(f"Execution url: {remote.generate_console_url(execution)}") +``` + +Synchronize the state of the Flyte execution object with the remote state during +execution with the {py:meth}`~flytekit.remote.remote.FlyteRemote.sync` method: + +```{code-block} python +synced_execution = remote.sync(execution) +print(synced_execution.inputs) # print out the inputs +``` + +You can also wait for the execution after you've launched it and access the +outputs: + +```{code-block} python +completed_execution = remote.wait(execution) +print(completed_execution.outputs) # print out the outputs +``` + +## Scheduling a Launch Plan + +Finally, you can create a {py:class}`~flytekit.LaunchPlan` that's scheduled +to run at a particular cadence by specifying the `schedule` argument: + +```{code-block} python +from flytekit import LaunchPlan, CronSchedule + +from workflows.example import wf + + +launch_plan = LaunchPlan.get_or_create( + wf, + name="wf_launchplan", + # run this launchplan every minute + schedule=CronSchedule(schedule="*/1 * * * *"), + default_inputs={"name": "Elmo"}, +) +``` + +You can also specify a fixed-rate interval: + +```{code-block} python +from datetime import timedelta +from flytekit import FixedRate + + +launch_plan = LaunchPlan.get_or_create( + wf, + name="wf_launchplan", + schedule=FixedRate(duration=timedelta(minutes=1)), + default_inputs={"name": "Elmo"}, +) +``` + +### Passing in the Scheduled Kick-off Time + +Suppose that your workflow is parameterized to take in a `datetime` argument, +which determines how the workflow is executed (e.g. reading in data using the +current date). + +You can specify a `kickoff_time_input_arg` in the schedule so that it +automatically passes the cron schedule kick-off time into the workflow: + +```{code-cell} ipython +from datetime import datetime +from flytekit import workflow, LaunchPlan, CronSchedule + + +@workflow +def process_data_wf(kickoff_time: datetime): + # read data and process it based on kickoff_time + ... + +process_data_lp = LaunchPlan.get_or_create( + process_data_wf, + name="process_data_lp", + schedule=CronSchedule( + schedule="*/1 * * * *", + kickoff_time_input_arg="kickoff_time", + ) +) +``` + +### Registering Launch Plans + +Any of the methods described in the {doc}`package_register` guide will register +a launchplan as long as it's defined in any of the Python modules that you +want to register to a Flyte backend. + +### Activating a Schedule + +Once you've registered your launch plan, You can use the `FlyteRemote` client or +the `flytectl` CLI to activate the schedule: + +:::::{tabs} + +::::{group-tab} `FlyteRemote` + +:::{code-block} +launchplan_id = remote.fetch_launch_plan(name="process_data_lp").id +remote.client.update_launch_plan(launchplan_id, "ACTIVE") +::: + +:::: + +::::{group-tab} `flytectl` + +:::{code-block} bash +flytectl update launchplan -p flyteexamples -d development \ + process_data_lp --version --activate +::: + +:::: + +::::: + +### Deactivating a Schedule + +Similarly, you can deactivate a launchplan with: + +:::::{tabs} + +::::{group-tab} `FlyteRemote` + +:::{code-block} +launchplan_id = remote.fetch_launch_plan(name="process_data_lp").id +remote.client.update_launch_plan(launchplan_id, "INACTIVE") +::: + +:::: + +::::{group-tab} `flytectl` + +:::{code-block} bash +flytectl update launchplan -p flyteexamples -d development \ + process_data_lp --version --archive +::: + +:::: + +::::: + +## What's Next? + +In this guide, you learned about how to: + +- Run tasks, workflows, and launch plans using `FlyteRemote`. +- Create a cron schedule to run a launch plan at a specified time interval. + +In the next guide, you'll learn how to visualize tasks using Flyte Decks. diff --git a/docs/introduction/flyte_fundamentals/tasks_and_workflows.md b/docs/introduction/flyte_fundamentals/tasks_and_workflows.md new file mode 100644 index 00000000000..ee4894d5668 --- /dev/null +++ b/docs/introduction/flyte_fundamentals/tasks_and_workflows.md @@ -0,0 +1,339 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +kernelspec: + display_name: Python 3 + language: python + name: python3 +--- + +(getting_started_tasks_and_workflows)= + +# Tasks, Workflows and LaunchPlans + +In {doc}`"Getting started with workflow development" `, we got a basic sense +of how Flyte works by creating and running a basic workflow made up of a few tasks. +In this guide, you'll learn more about how tasks and workflows fit into the Flyte +programming model. + +## Tasks + +Flyte tasks are the core building blocks of larger, more complex workflows. + +### Tasks are Containerized Blocks of Compute + +You can think of a Flyte task as a containerized block of compute. When a task +runs on a Flyte backend, it's isolated within its own container, separate from +all other tasks. Consider this simple one: + +```{code-cell} ipython3 +from typing import List +from flytekit import task + +@task +def mean(values: List[float]) -> float: + return sum(values) / len(values) +``` + +As you can see, a task is just a regular Python function that's decorated +with {py:func}`@task `. We can run this function just like any +other Python function: + +```{code-cell} ipython3 +mean(values=[float(i) for i in range(1, 11)]) +``` + +```{important} +There are three important things to note here: + +- Most of the Flyte tasks you'll ever write can be executed locally. +- Tasks and workflows must be invoked with keyword arguments. +- When a task runs on a Flyte cluster, it runs on a + [Kubernetes Pod](https://kubernetes.io/docs/concepts/workloads/pods/), where + Flyte orchestrates what task to run at what time in the context of a workflow. +``` + +### Tasks are Strongly Typed + +You might also notice that the `mean` function signature is type-annotated with +Python type hints. Flyte uses these annotations to check the input and output +types of the task when it's compiled or invoked. + +Under the hood, Flyte uses its own type system that translates values to and from +Flyte types and the SDK language types, in this case Python. The Flyte type +system uses Python type annotations to make sure that the data passing through +tasks and workflows are compatible with the explicitly stated types that we +define through a function signature. + +So if we call the `mean` function with the wrong types, we get an error: + +```{code-cell} ipython3 +try: + mean(values="hi") +except Exception as e: + print(e) +``` + +This may not seem like much for this simple example, but as you start dealing +with more complex data types and pipelines, Flyte's type system becomes +invaluable for catching bugs early. + +Flyte's type system is also used for caching, data lineage tracking, and +automatic serialization and deserialization of data as it's passed from one task +to another. You can learn more about it in the {ref}`User Guide `. + +## Workflows + +Workflows compose multiple tasks – or other workflows – into meaningful steps +of computation to produce some useful set of outputs or outcomes. + +Suppose the `mean` task is just one building block of a larger computation. +This is where Flyte workflows can help us manage the added complexity. + +### Workflows Build Execution Graphs + +Suppose that we want to mean-center and standard-deviation-scale a set of +values. In addition to a `mean` function, we also need to compute standard +deviation and implement the centering and scaling logic. + +Let's go ahead and implement those as tasks: + +```{code-cell} ipython3 +from math import sqrt +from flytekit import workflow + + +@task +def standard_deviation(values: List[float], mu: float) -> float: + variance = sum([(x - mu) ** 2 for x in values]) + return sqrt(variance) + +@task +def standard_scale(values: List[float], mu: float, sigma: float) -> List[float]: + return [(x - mu) / sigma for x in values] +``` + +Then we put all the pieces together into a workflow, which is a function +that's decorated with {py:func}`@workflow `: + +```{code-cell} ipython3 +@workflow +def standard_scale_workflow(values: List[float]) -> List[float]: + mu = mean(values=values) + sigma = standard_deviation(values=values, mu=mu) + return standard_scale(values=values, mu=mu, sigma=sigma) +``` + +Just like tasks, workflows are executable in a regular Python runtime: + +```{code-cell} ipython3 +standard_scale_workflow(values=[float(i) for i in range(1, 11)]) +``` + +(workflows_versus_task_syntax)= + +### Workflows versus Tasks Under the Hood + +Although Flyte workflow syntax looks like Python code, it's actually a +[domain-specific language (DSL)](https://en.wikipedia.org/wiki/Domain-specific_language) +for building execution graphs where tasks – and other workflows – serve as the +building blocks. + +This means that the workflow function body only supports a subset of Python's +semantics: + +- In workflows, you shouldn't use non-deterministic operations like + `rand.random`, `time.now()`, etc. These functions will be invoked at compile + time and your workflows will not behave as you expect them to. +- Within workflows, the inputs of workflow and the outputs of tasks function are promises under the hood, + so you can't access and operate on them like typical Python function outputs. + _You can only pass promises into tasks, workflows, and other Flyte constructs_. +- Regular Python conditionals won't work as intended in workflows: you need to + use the {ref}`conditional ` construct. + +In contrast to workflow code, the code within tasks is actually executed by a +Python interpreter when it's run locally or inside a container when run on a +Flyte cluster. + +### Workflows Deal with Promises + +A promise is essentially a placeholder for a value that hasn't been +materialized yet. To show you what this means concretely, let's re-define +the workflow above but let's also print the output of one of the tasks: + +```{code-cell} ipython3 +@workflow +def standard_scale_workflow_with_print(values: List[float]) -> List[float]: + mu = mean(values=values) + print(mu) # this is not the actual float value! + sigma = standard_deviation(values=values, mu=mu) + return standard_scale(values=values, mu=mu, sigma=sigma) +``` + +We didn't even execute the workflow and we're already seeing the value of `mu`, +which is a promise. So what's happening here? + +When we decorate `standard_scale_workflow_with_print` with `@workflow`, Flyte +compiles an execution graph that's defined inside the function body, so +_it doesn't actually run the computations yet_. Therefore, when Flyte compiles a +workflow, the outputs of task calls are actually promises and not regular python +values. + +### Workflows are Strongly Typed Too + +Since both tasks and workflows are strongly typed, Flyte can actually catch +type errors! When we learn more about packaging and registering in the next few +guides, we'll see that Flyte can also catch compile-time errors even before +you running any code! + +For now, however, we can run the workflow locally to see that we'll get an +error if we introduce a bug in the `standard_scale` task. + +```{code-cell} ipython3 +@task +def buggy_standard_scale(values: List[float], mu: float, sigma: float) -> float: + """ + 🐞 The implementation and output type of this task is incorrect! It should + be List[float] instead of a sum of all the scaled values. + """ + return sum([(x - mu) / sigma for x in values]) + +@workflow +def buggy_standard_scale_workflow(values: List[float]) -> List[float]: + mu = mean(values=values) + sigma = standard_deviation(values=values, mu=mu) + return buggy_standard_scale(values=values, mu=mu, sigma=sigma) + +try: + buggy_standard_scale_workflow(values=[float(i) for i in range(1, 11)]) +except Exception as e: + print(e) +``` + +### Workflows can be Embedded in Other Workflows + +When a workflow uses another workflow as part of the execution graph, we call +the inner workflow a **subworkflow**. Subworkflows are strongly typed and can +be invoked just like tasks when defining the outer workflow. + +For example, we can embed `standard_scale_workflow` inside +`workflow_with_subworkflow`, which uses a `generate_data` task to supply the +data for scaling: + +```{code-cell} ipython3 +import random + +@task +def generate_data(num_samples: int, seed: int) -> List[float]: + random.seed(seed) + return [random.random() for _ in range(num_samples)] + +@workflow +def workflow_with_subworkflow(num_samples: int, seed: int) -> List[float]: + data = generate_data(num_samples=num_samples, seed=seed) + return standard_scale_workflow(values=data) + +workflow_with_subworkflow(num_samples=10, seed=3) +``` + +```{important} +Learn more about subworkflows in the {ref}`User Guide `. +``` + +### Specifying Dependencies without Passing Data + +You can also specify dependencies between tasks and subworkflows without passing +data from the upstream entity to the downstream entity using the `>>` right shift +operator: + +```{code-block} python +@workflow +def wf(): + promise1 = task1() + promise2 = task2() + promise3 = subworkflow() + promise1 >> promise2 + promise2 >> promise3 +``` + +In this workflow, `task1` will execute before `task2`, but it won't pass any of +its data to `task2`. Similarly, `task2` will execute before `subworkflow`. + +```{important} +Learn more about chaining flyte entities in the {ref}`User Guide `. +``` + +## Launch plans + +A Flyte {py:class}`~flytekit.LaunchPlan` is a partial or complete binding of +inputs necessary to launch a workflow. You can think of it like +the {py:func}`~functools.partial` function in the Python standard library where +you can define default (overrideable) and fixed (non-overrideable) inputs. + +```{note} +Additionally, `LaunchPlan`s provides an interface for specifiying run-time +overrides such as notifications, schedules, and more. +``` + +Create a launch plan like so: + +```{code-cell} ipython3 +from flytekit import LaunchPlan + +standard_scale_launch_plan = LaunchPlan.get_or_create( + standard_scale_workflow, + name="standard_scale_lp", + default_inputs={"values": [3.0, 4.0, 5.0]} +) +``` + +### Invoking LaunchPlans Locally + +You can run a `LaunchPlan` locally. This is, using the local Python interpreter (REPL). It will use the `default_inputs` dictionary +whenever it's invoked: + +```{code-cell} ipython3 +standard_scale_launch_plan() +``` + +Of course, these defaults can be overridden: + +```{code-cell} ipython3 +standard_scale_launch_plan(values=[float(x) for x in range(20, 30)]) +``` + +Later, you'll learn how to run a launch plan on a `cron` schedule, but for +the time being you can think of them as a way for you to templatize workflows +for some set of related use cases, such as model training with a fixed dataset +for reproducibility purposes. + +### LaunchPlans can be Embedded in Workflows + +Similar to subworkflows, launchplans can be used in a workflow definition: + +```{code-cell} ipython3 +@workflow +def workflow_with_launchplan(num_samples: int, seed: int) -> List[float]: + data = generate_data(num_samples=num_samples, seed=seed) + return standard_scale_launch_plan(values=data) + +workflow_with_launchplan(num_samples=10, seed=3) +``` + +The main difference between subworkflows and launch plans invoked in workflows is +that the latter will kick off a new workflow execution on the Flyte cluster with +its own execution name, while the former will execute the workflow in the +context of the parent workflow's execution context. + +```{important} +Learn more about subworkflows in the {ref}`User Guide `. +``` + +## What's Next? + +So far we've been working with small code snippets and self-contained scripts. +Next, we'll see how to organize a Flyte project that follows software +engineering best practices, including organizing code into meaningful modules, defining third-party dependencies, and creating a container image for making our workflows reproducible. diff --git a/docs/introduction/flyte_fundamentals/visualizing_task_input_and_output.md b/docs/introduction/flyte_fundamentals/visualizing_task_input_and_output.md new file mode 100644 index 00000000000..8c793d9776f --- /dev/null +++ b/docs/introduction/flyte_fundamentals/visualizing_task_input_and_output.md @@ -0,0 +1,296 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +kernelspec: + display_name: Python 3 + language: python + name: python3 +--- + +(getting_started_visualizing_task_input_and_output)= + +# Visualizing task input and output + +Flyte {py:class}`~flytekit.deck.Deck`s are one of the first-class constructs in +Flyte, allowing you to generate static HTML reports associated with any of the +outputs materialized within your tasks. + +You can think of Decks as stacks of HTML snippets that are logically grouped by +tabs. By default, every task has three decks: an **input**, an **output**, and a +**default** deck. + +Flyte materializes Decks via `Renderer`s, which are specific implementations of +how to generate an HTML report from some Python object. + +## Enabling Flyte Decks + +To enable Flyte Decks, simply set `disable_deck=False` in the `@task` decorator: + +```{code-cell} ipython3 +import pandas as pd +from flytekit import task, workflow + + +@task(disable_deck=False) +def iris_data() -> pd.DataFrame: + ... +``` + +Specifying this flag indicates that Decks should be rendered whenever this task +is invoked. + +## Rendering Task Inputs and Outputs + +By default, Flyte will render the inputs and outputs of tasks with the built-in +renderers in the corresponding **input** and **output** {py:class}`~flytekit.deck.Deck`s, +respectively. In the following task, we load the iris dataset using the `plotly` package. + +```{code-cell} ipython3 + +import plotly.express as px +from typing import Optional + +from flytekit import task, workflow + + +@task(disable_deck=False) +def iris_data( + sample_frac: Optional[float] = None, + random_state: Optional[int] = None, +) -> pd.DataFrame: + data = px.data.iris() + if sample_frac is not None: + data = data.sample(frac=sample_frac, random_state=random_state) + return data + + +@workflow +def wf( + sample_frac: Optional[float] = None, + random_state: Optional[int] = None, +): + iris_data(sample_frac=sample_frac, random_state=random_state) +``` + +Then, invoking the workflow containing a deck-enabled task will render the +following reports for the input and output data in an HTML file, which you can +see in the logs: + +```{code-cell} ipython3 +--- +tags: [remove-input] +--- + +# this is an unrendered cell, used to capture the logs in order to render the +# Flyte Decks directly in the docs. +import datetime +import logging +import os +import re +import shutil +from pythonjsonlogger import jsonlogger +from IPython.display import HTML +from pathlib import Path + + +class DeckFilter(logging.Filter): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.formatter = jsonlogger.JsonFormatter( + fmt="%(asctime)s %(name)s %(levelname)s %(message)s" + ) + self.logs = [] + self.deck_files = {} + + def filter(self, record): + patt = "(.+) task creates flyte deck html to (.+/deck.html)" + msg = record.getMessage() + matches = re.match(patt, msg) + + if msg == "Connection error. Skip stats collection.": + return False + + if matches: + task, filepath = matches.group(1), matches.group(2) + self.logs.append(self.formatter.format(record)) + self.deck_files[task] = re.sub("^file://", "", filepath) + return False + +def cp_deck(src): + src = Path(src) + target = Path.cwd() / "_flyte_decks" / src.parent.name + target.mkdir(parents=True, exist_ok=True) + shutil.copy(src, target) + return target / "deck.html" + + +logger = logging.getLogger("flytekit") +logger.setLevel(20) + +deck_filter = DeckFilter() +logger.addFilter(deck_filter) +``` + +```{code-cell} ipython3 +--- +tags: [remove-output] +--- +wf(sample_frac=1.0, random_state=42) +``` + +```{code-cell} ipython3 +--- +tags: [remove-input] +--- +logger.removeFilter(deck_filter) +for log in deck_filter.logs: + print(log) +``` + +```{note} +To see where the HTML file is written to when you run the deck-enabled tasks +locally, you need to set the `FLYTE_SDK_LOGGING_LEVEL` environment variable +to `20`. Doing so will emit logs that look like the above print statement, +where the `deck.html` filepath can be found in the `message` key. +``` + +## Rendering In-line Decks + +You can render Decks inside the task function body by using the **default** +deck, which you can access with the {py:func}`~flytekit.current_context` +function. In the following example, we extend the `iris_data` task with: + +- A markdown snippet to provide more context about what the task does. +- A boxplot of the `sepal_length` variable using {py:class}`~flytekitplugins.deck.renderer.BoxRenderer`, + which leverages the `plotly` package to auto-generate a set of plots and + summary statistics from the dataframe. + +```{code-cell} ipython3 +import flytekit +from flytekitplugins.deck.renderer import MarkdownRenderer, BoxRenderer + +@task(disable_deck=False) +def iris_data( + sample_frac: Optional[float] = None, + random_state: Optional[int] = None, +) -> pd.DataFrame: + data = px.data.iris() + if sample_frac is not None: + data = data.sample(frac=sample_frac, random_state=random_state) + + md_text = ( + "# Iris Dataset\n" + "This task loads the iris dataset using the `plotly` package." + ) + flytekit.current_context().default_deck.append(MarkdownRenderer().to_html(md_text)) + flytekit.Deck("box plot", BoxRenderer("sepal_length").to_html(data)) + return data +``` + +This will create new tab in the Flyte Deck HTML view named **default**, which +should contain the markdown text we specified. + +(getting_started_customer_renderers)= + +## Custom Renderers + +What if we don't want to show raw data values in the Flyte Deck? We can create a +pandas dataframe renderer that summarizes the data instead of showing raw values +by creating a custom renderer. A renderer is essentially a class with a +`to_html` method. + +```{code-cell} ipython +class DataFrameSummaryRenderer: + + def to_html(self, df: pd.DataFrame) -> str: + assert isinstance(df, pd.DataFrame) + return df.describe().to_html() +``` + +Then we can use the `Annotated` type to override the default renderer of the +`pandas.DataFrame` type: + +```{code-cell} ipython3 +--- +tags: [remove-output] +--- + +try: + from typing import Annotated +except ImportError: + from typing_extensions import Annotated + + +@task(disable_deck=False) +def iris_data( + sample_frac: Optional[float] = None, + random_state: Optional[int] = None, +) -> Annotated[pd.DataFrame, DataFrameSummaryRenderer()]: + data = px.data.iris() + if sample_frac is not None: + data = data.sample(frac=sample_frac, random_state=random_state) + + md_text = ( + "# Iris Dataset\n" + "This task loads the iris dataset using the `plotly` package." + ) + flytekit.current_context().default_deck.append(MarkdownRenderer().to_html(md_text)) + flytekit.Deck("box plot", BoxRenderer("sepal_length").to_html(data)) + return data +``` + +Finally, we can run the workflow and embed the resulting html file by parsing +out the filepath from logs: + +```{code-cell} ipython3 +--- +tags: [remove-input] +--- +import warnings + +@workflow +def wf( + sample_frac: Optional[float] = None, + random_state: Optional[int] = None, +): + iris_data(sample_frac=sample_frac, random_state=random_state) + +deck_filter = DeckFilter() +logger.addFilter(deck_filter) + +with warnings.catch_warnings(): + warnings.simplefilter("ignore") + wf(sample_frac=1.0, random_state=42) + +logger.removeFilter(deck_filter) +HTML(filename=cp_deck(deck_filter.deck_files["iris_data"])) +``` + +As you can see above, Flyte renders in-line decks in the order in which they are +called in the task function body: **default**, then **box plot**, which contain +the markdown description and box plot, respectively. + +The built-in decks for the **input** arguments for primitive types like `int` +and `float` are barebones, simply showing the values, and the **output** argument +contains the output of our custom `DataFrameSummaryRenderer` to show a summary +of our dataset. + +```{admonition} Learn more +:class: important + +Flyte Decks are simple to customize, as long as you can render the Python object +into some HTML representation. + +Learn more about Flyte Decks in the {ref}`User Guide `. +``` + +## What's Next? + +In this guide, you learned how to generate static HTML reports to gain more +visibility into Flyte tasks. In the next guide, you'll learn how to optimize +your tasks via caching, retries, parallelization, resource allocation, and +plugins. diff --git a/docs/introduction/getting_started_with_workflow_development/creating_a_flyte_project.md b/docs/introduction/getting_started_with_workflow_development/creating_a_flyte_project.md new file mode 100644 index 00000000000..608ff9b86a4 --- /dev/null +++ b/docs/introduction/getting_started_with_workflow_development/creating_a_flyte_project.md @@ -0,0 +1,81 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +--- + +# Creating a Flyte project + +## About Flyte projects + +A Flyte project is a directory containing task and workflow code, internal Python source code, configuration files, and other artifacts structured according to software engineering best practices that enables you to package up your code so that it can be registered to a Flyte cluster. + +## Prerequisites + +* Follow the steps in {doc}`"Installing development tools" ` +* (Optional, but recommended) Install [git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) + +## Steps + +### 1. Activate your Python virtual environment + +If you are using conda or another Python virtual environment manager, first, activate the virtual environment you will use to manage dependencies for your Flyte project: + +```{prompt} bash $ +conda activate flyte-example +``` + +### 2. Initialize your Flyte project + +Next, initialize your Flyte project. The [flytekit-python-template GitHub repository](https://github.com/flyteorg/flytekit-python-template) contains Flyte project templates with sample code that you can run as is or modify to suit your needs. + +In this example, we will initialize the [basic-example-imagespec project template](https://github.com/flyteorg/flytekit-python-template/tree/main/basic-example-imagespec). + +```{prompt} bash $ +pyflyte init my_project +``` + +:::{note} + +To initialize a Flyte project with a different template, use the `--template` parameter: + +`pyflyte init --template hello-world hello-world` +::: + +### 3. Install additional requirements + +After initializing your Flyte project, you will need to install requirements listed in `requirements.txt`: + +```{prompt} bash $ +cd my_project +pip install -r requirements.txt +``` + +### 4. (Optional) Version your Flyte project with git + +We highly recommend putting your Flyte project code under version control. To do so, initialize a git repository in the Flyte project directory: + +```{prompt} bash $ +git init +``` + +```{note} +If you are using a Dockerfile instead of ImageSpec, you will need to initialize a git repository and create at least one commit, since the commit hash is used to tag the image when it is built. +``` + +### 5. Run your workflow in a local Python environment + +To check that your Flyte project was set up correctly, run the workflow in a local Python environment: + +```{prompt} bash $ +cd workflows +pyflyte run example.py wf +``` + +## Next steps + +To learn about the parts of a Flyte project, including tasks and workflows, see {doc}`"Flyte project components" `. + +To run the workflow in your Flyte project in a local Flyte cluster, see the {ref}`"Running a workflow in a local cluster" ` section of {doc}`"Running a workflow locally" `. diff --git a/docs/introduction/getting_started_with_workflow_development/flyte_project_components.md b/docs/introduction/getting_started_with_workflow_development/flyte_project_components.md new file mode 100644 index 00000000000..172586b7d67 --- /dev/null +++ b/docs/introduction/getting_started_with_workflow_development/flyte_project_components.md @@ -0,0 +1,118 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +--- + +# Flyte project components + +A Flyte project is a directory containing task and workflow code, internal Python source code, configuration files, and other artifacts needed to package up your code so that it can be run on a Flyte cluster. + +## Directory structure + +If you look at the project you created with `pyflyte init` in {doc}`"Creating a Flyte project" `, you'll see the following directory structure: + +```{code-block} bash +my_project +├── LICENSE +├── README.md +├── requirements.txt # Python dependencies +└── workflows + ├── __init__.py + └── example.py # Example Flyte workflow code +``` + +(getting_started_python_dependencies)= + +## `requirements.txt` Python dependencies + +Most Flyte projects contain a `requirements.txt` file that you can modify to suit the needs of your project. + +You can specify pip-installable Python dependencies in your project by adding them to the +`requirements.txt` file. + +```{note} +We recommend using [pip-compile](https://pip-tools.readthedocs.io/en/latest/) to +manage your project's Python requirements. +``` + +````{dropdown} See requirements.txt + +```{rli} https://raw.githubusercontent.com/flyteorg/flytekit-python-template/main/simple-example/%7B%7Bcookiecutter.project_name%7D%7D/requirements.txt +:caption: requirements.txt +``` + +```` + +(getting_started_workflow_code)= + +## `example.py` workflow code + +Flyte projects initialized with `pyflyte init` contain a `workflows` directory, inside of which is a Python file that holds the workflow code for the application. + +The workflow code may contain ImageSpec configurations, and one or more task and workflow functions, decorated with the `@task` and `@workflow` decorators, respectively. + +```{note} +The workflow directory also contains an `__init__.py` file to indicate that the workflow code is part of a Python package. For more information, see the [Python documentation](https://docs.python.org/3/reference/import.html#regular-packages). +``` + +### ImageSpec + +The workflow code file in the basic template includes an optional ImageSpec configuration. ImageSpec is a Flyte feature that enables you to build a custom container image without having to write a Dockerfile. To learn more, see the [ImageSpec documentation](https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/customizing_dependencies/image_spec.html#image-spec-example) + +```python +# basic_image = ImageSpec( +# name="flytekit", # rename this to your docker image name +# base_image="ghcr.io/flyteorg/flytekit:py3.11-1.10.2", +# # the base image that flytekit will use to build your image +# packages=["example-package"], # packages to add to the base image +# # remove "example-package" before using. +# registry="ghcr.io/unionai-oss", +# # the registry your image will be pushed to +# python_version="3.11" +# # the python version; optional if not different from the base image +# ) +``` + +```{note} +If you need to use a Dockerfile instead of ImageSpec, you will need to add a Dockerfile and a `docker_build.sh` script to the top-level directory of your project, and either remove any ImageSpec configurations from the workflow code file or leave them commented out. +``` + +### The `@task` and `@workflow` decorators + +* The `@task` and `@workflow` decorators can be parsed by Python provided that they are used only on functions at the top-level scope of the module. +* Task and workflow function signatures must be type-annotated with Python type hints. +* Tasks and workflows can be invoked like regular Python methods, and even imported and used in other Python modules or scripts. +* Task and workflow functions must be invoked with keyword arguments. + +#### `@task` + +The `@task` decorator indicates a Python function that defines a task. + +* A task is a Python function that takes some inputs and produces an output. +* Tasks are assembled into workflows. +* When deployed to a Flyte cluster, each task runs in its own [Kubernetes Pod](https://kubernetes.io/docs/concepts/workloads/pods/), where Flyte orchestrates what task runs at what time in the context of a workflow. + +```python +@task() +def say_hello(name: str) -> str: + return f"Hello, {name}!" +``` + +#### `@workflow` + +The `@workflow` decorator indicates a function-esque construct that defines a workflow. + +* Workflows specify the flow of data between tasks, and the dependencies between tasks. +* A workflow appears to be a Python function but is actually a [domain-specific language (DSL)](https://en.wikipedia.org/wiki/Domain-specific_language) that only supports a subset of Python syntax and semantics. +* When deployed to a Flyte cluster, the workflow function is "compiled" to construct the directed acyclic graph (DAG) of tasks, defining the order of execution of task pods and the data flow dependencies between them. + +```python +@workflow +def wf(name: str = "world") -> typing.Tuple[str, int]: + greeting = say_hello(name=name) + greeting_len = greeting_length(greeting=greeting) + return greeting, greeting_len +``` diff --git a/docs/introduction/getting_started_with_workflow_development/index.md b/docs/introduction/getting_started_with_workflow_development/index.md new file mode 100644 index 00000000000..8109db5c7d4 --- /dev/null +++ b/docs/introduction/getting_started_with_workflow_development/index.md @@ -0,0 +1,34 @@ +--- +# override the toc-determined page navigation order +prev-page: getting_started/quickstart_guide +prev-page-title: Quickstart guide +--- + +(getting_started_workflow_development)= +# Getting started with workflow development + +Machine learning engineers, data engineers, and data analysts represent the processes that consume, transform, and output data with directed acyclic graphs (DAGs). In this section, you will learn how to create a Flyte project to contain the workflow code that implements your DAG, as well as the configuration files needed to package the code to run on a local or remote Flyte cluster. + +```{list-table} +:header-rows: 0 +:widths: 20 30 + +* - {doc}`Installing development tools ` + - Install the tools needed to create Flyte projects and run workflows and tasks. +* - {doc}`Creating a Flyte project ` + - Create a Flyte project that contains workflow code and essential configuration files. +* - {doc}`Flyte project components ` + - Understand the directory structure, configuration files, and code in a Flyte project. +* - {doc}`Running a workflow locally ` + - Execute a workflow in a local Python environment or in a local Flyte cluster. +``` + +```{toctree} +:maxdepth: -1 +:hidden: + +installing_development_tools +creating_a_flyte_project +flyte_project_components +running_a_workflow_locally +``` diff --git a/docs/introduction/getting_started_with_workflow_development/installing_development_tools.md b/docs/introduction/getting_started_with_workflow_development/installing_development_tools.md new file mode 100644 index 00000000000..708e2cbca97 --- /dev/null +++ b/docs/introduction/getting_started_with_workflow_development/installing_development_tools.md @@ -0,0 +1,43 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +--- + +(getting_started_installing_development_tools)= + +# Installing development tools + +To create and run workflows in Flyte, you must install Python and Flytekit. We recommend installing conda or another Python virtual environment manager to manage Flytekit and other dependencies, but if you do not, you can install Flytekit with pip. + +## 1. Install Python + +Python version 3.8x or higher is supported. To install Python, follow the download instructions for your operating system on the [Python downloads site](https://www.python.org/downloads/). + +If you already have Python installed, you can use conda or pyenv to install the recommended version. + +## 2. (Optional but recommended) Install conda and create a virtual environment + +We strongly recommend installing conda with [miniconda](https://docs.conda.io/projects/miniconda/en/latest/) to manage Python versions and virtual environments. + +After installing conda, create and activate a virtual environment on the command line: + + +```{prompt} bash $ +conda create -n flyte-example python=3.10 -y +conda activate flyte-example +``` + +```{note} +You can also use other virtual environment managers, such as [pyenv](https://github.com/pyenv/pyenv) and [venv](https://docs.python.org/3/library/venv.html). +``` + +## 3. Install Flytekit + +To install or upgrade Flytekit, run the following command: + +```{prompt} bash +pip install -U flytekit +``` diff --git a/docs/introduction/getting_started_with_workflow_development/running_a_workflow_locally.md b/docs/introduction/getting_started_with_workflow_development/running_a_workflow_locally.md new file mode 100644 index 00000000000..80c8551f35b --- /dev/null +++ b/docs/introduction/getting_started_with_workflow_development/running_a_workflow_locally.md @@ -0,0 +1,168 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +--- + +# Running a workflow locally + +You can run a workflow locally in two ways: + +* **{ref}`In a local Python environment `:** To develop and test your code quickly without the overhead of setting up a local Flyte cluster, you can run your workflow in a local Python environment. +* **{ref}`In a local Flyte cluster `:** To test your code in a more production-like setting, you can run your workflow in a local cluster, such as the demo Flyte cluster. + +(getting_started_running_workflow_local_python_environment)= + +## Running a workflow in a local Python environment + +### Prerequisites + +* {doc}`Install development tools ` +* {doc}`Create a Flyte project ` + +### Steps + +1. On the command line, navigate to the workflows directory of your Flyte project: +```{prompt} bash $ +cd my_project/workflows +``` +2. Run the workflow with `pyflyte run`: +```{prompt} bash $ +pyflyte run example.py wf +``` + +:::{note} +While you can run the example file like a Python script with `python example.py`, we recommend using `pyflyte run` instead. To run the file like a Python script, you would have to add a `main` module conditional at the end of the script: +```python +if __name__ == "__main__": + print(wf()) +``` + +Your code would become even more verbose if you wanted to pass arguments to the workflow: +```python +if __name__ == "__main__": + from argparse import ArgumentParser + + parser = ArgumentParser() + parser.add_argument("--name", type=str) + + args = parser.parse_args() + print(wf(name=args.name)) +``` +::: + +(getting_started_running_workflow_local_cluster)= + +## Running a workflow in a local Flyte cluster + +### Prerequisites + +#### 1. Install development tools + +If you have not already done so, follow the steps in {doc}`"Installing development tools" ` to install Python, Flytekit, and optionally, conda. + +#### 2. Create a Flyte project + +If you have not already done so, follow the steps in {doc}`"Creating a Flyte project" ` to create a Flyte project. + +#### 3. Install Docker + +Follow the steps in the [Docker installation guide](https://docs.docker.com/get-docker/) to install Docker. + +Flyte supports any [OCI-compatible](https://opencontainers.org/) container technology (like [Podman](https://podman.io/), [LXD](https://linuxcontainers.org/lxd/introduction/), and [Containerd](https://containerd.io/)), but for the purpose of this documentation, `flytectl` uses Docker to start a local Kubernetes cluster that you can interact with on your machine. + +#### 4. Install `flytectl` + +You must install `flytectl` to start and configure a local Flyte cluster, as well as register workflows to a local or remote Flyte cluster. + +````{tabbed} macOS +To use Homebrew, on the command line, run the following: + +```{prompt} bash $ +brew install flyteorg/homebrew-tap/flytectl +``` + +To use `curl`, on the command line, run the following: + +```{prompt} bash $ +curl -sL https://ctl.flyte.org/install | sudo bash -s -- -b /usr/local/bin +``` + +To download manually, see the [flytectl releases](https://github.com/flyteorg/flytectl/releases). +```` + +````{tabbed} Linux +To use `curl`, on the command line, run the following: + +```{prompt} bash $ +curl -sL https://ctl.flyte.org/install | sudo bash -s -- -b /usr/local/bin +``` + +To download manually, see the [flytectl releases](https://github.com/flyteorg/flytectl/releases). +```` + +````{tabbed} Windows +To use `curl`, in a Linux shell (such as [WSL](https://learn.microsoft.com/en-us/windows/wsl/install)), on the command line, run the following: + +```{prompt} bash $ +curl -sL https://ctl.flyte.org/install | sudo bash -s -- -b /usr/local/bin +``` + +To download manually, see the [flytectl releases](https://github.com/flyteorg/flytectl/releases). +```` + +### Steps + +1. Export the `FLYTECTL_CONFIG` environment variable in your shell: + +```{prompt} bash $ +export FLYTECTL_CONFIG=~/.flyte/config-sandbox.yaml +``` +2. Start the Docker daemon. +3. Start the demo cluster: + +```{prompt} bash $ +flytectl demo start +``` +4. Create a project on the demo cluster to correspond to your local Flyte project: +```{prompt} bash $ +flytectl create project \ + --id "my-project" \ + --labels "my-label=my-project" \ + --description "My Flyte project" \ + --name "My project" +``` +3. On the command line, navigate to the workflows directory of your Flyte project: +```{prompt} bash $ +cd my_project/workflows +``` +4. Run the workflow on the Flyte cluster with `pyflyte run` using the `--remote` flag and additional parameters for the project name and domain. In this example, you can also optionally pass a `name` parameter to the workflow: +```{prompt} bash $ +pyflyte run --remote -p my-project -d development example.py wf --name Ada +``` + +You should see a URL to the workflow execution on your demo Flyte cluster, where `` is a unique identifier for the workflow execution: + +```{prompt} bash $ +Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/ to see execution in the console. +``` + +### Inspecting a workflow run in the FlyteConsole web interface + +You can inspect the results of a workflow run by navigating to the URL produced by `pyflyte run` for the workflow execution. You should see FlyteConsole, the web interface used to manage Flyte entities such as tasks, workflows, and executions. The default execution view shows the list of tasks executing in sequential order. + +![Landing page of Flyte UI showing two successful tasks run for one workflow execution, along with Nodes, Graph, and Timeline view switcher links](https://raw.githubusercontent.com/flyteorg/static-resources/main/flytesnacks/getting_started/flyteconsole_default.png) + +#### Task panel + +Clicking on a single task will open a panel that shows task logs, inputs, outputs, and metadata. + +![Single task panel showing task logs, rerun task button, and executions, inputs, outputs, and task metadata sections](https://raw.githubusercontent.com/flyteorg/static-resources/main/flytesnacks/getting_started/flyteconsole_task_panel.png) + +#### Graph view + +The **Graph** view shows the execution graph of the workflow, providing visual information about the topology of the graph and the state of each node as the workflow progresses. + +![Graph view of single workflow execution showing directed acyclic graph of start node, say_hello_node, greeting_length node, and end node](https://raw.githubusercontent.com/flyteorg/static-resources/main/flytesnacks/getting_started/flyteconsole_graph_view.png) diff --git a/docs/introduction.md b/docs/introduction/index.md similarity index 96% rename from docs/introduction.md rename to docs/introduction/index.md index 8e61c433a4a..ce70f21061d 100644 --- a/docs/introduction.md +++ b/docs/introduction/index.md @@ -1,10 +1,4 @@ --- -jupytext: - formats: md:myst - text_representation: - extension: .md - format_name: myst - # override the toc-determined page navigation order next-page: getting_started/quickstart_guide next-page-title: Quickstart guide diff --git a/docs/introduction/quickstart_guide.md b/docs/introduction/quickstart_guide.md new file mode 100644 index 00000000000..cc62fbaaa27 --- /dev/null +++ b/docs/introduction/quickstart_guide.md @@ -0,0 +1,97 @@ +--- +# override the toc-determined page navigation order +prev-page: index +prev-page-title: Introduction to Flyte +next-page: getting_started/getting_started_with_workflow_development +next-page-title: Getting started with workflow development +--- + +(getting_started_quickstart_guide)= +# Quickstart guide + +In this guide, you will create and run a Flyte workflow in a local Python environment to generate the output "Hello, world!" + +## Prerequisites + +* [Install Python 3.8x or higher](https://www.python.org/downloads/) +* Install [Flytekit](https://github.com/flyteorg/flytekit) with `pip install -U flytekit` + +## Steps + +### 1. Create a "Hello, world!" file + +To create an example workflow file, copy the following into a file called `example.py`: + +```python +from flytekit import task, workflow + + +@task +def say_hello(name: str) -> str: + return f"Hello, {name}!" + + +@workflow +def hello_world_wf(name: str = 'world') -> str: + res = say_hello(name=name) + return res + + +if __name__ == "__main__": + print(f"Running wf() {hello_world_wf(name='passengers')}") +``` + +:::{note} +You can also use the `pyflyte init` command to initialize the "Hello, world!" Flyte project by running the following command: + +```{prompt} bash $ +pyflyte init --template hello-world hello-world +``` + +This will create a project directory that contains an `example.py` file with code above. +::: + +### 2. Run the example workflow in a local Python environment + +Next, run the workflow in the example workflow file with `pyflyte run`. The initial arguments of `pyflyte run` take the form of +`path/to/script.py `, where `` +refers to the function decorated with `@task` or `@workflow` that you wish to run: + +```{prompt} bash $ +pyflyte run example.py hello_world_wf +``` + +You can also provide a `name` argument to the workflow: +```{prompt} bash $ +pyflyte run example.py hello_world_wf --name Ada +``` + +:::{note} +If you created a "Hello, world" project using `pyflyte init`, you will need to change directories before running the workflow: +```{prompt} bash $ +cd hello-world +pyflyte run example.py hello_world_wf +``` +::: + +## The @task and @workflow decorators + +In this example, the file `example.py` contains a task and a workflow, decorated with the `@task` and `@workflow` decorators, respectively. You can invoke tasks and workflows like regular Python methods, and even import and use them in other Python modules or scripts. + +```python +@task +def say_hello(name: str) -> str: + return f"Hello, {name}!" + + +@workflow +def hello_world_wf(name: str = 'world') -> str: + res = say_hello(name=name) + return res +``` + +To learn more about tasks and workflows, see the {ref}`"Workflow code" section` of {doc}`"Flyte project components"`. + +## Next steps + +To create a productionizable Flyte project to structure your code according to software engineering best practices, and that can be used to package your code for deployment to a Flyte cluster, see {doc}`"Getting started with workflow development" `