-
Notifications
You must be signed in to change notification settings - Fork 674
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Move intro docs from flytesnacks to flyte
Signed-off-by: Peeter Piegaze <[email protected]>
- Loading branch information
Showing
19 changed files
with
2,938 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
--- | ||
jupytext: | ||
formats: md:myst | ||
text_representation: | ||
extension: .md | ||
format_name: myst | ||
kernelspec: | ||
display_name: Python 3 | ||
language: python | ||
name: python3 | ||
|
||
next-page: userguide | ||
next-page-title: User Guide | ||
--- | ||
|
||
(getting_started_analytics)= | ||
|
||
# Analytics | ||
|
||
Flyte is ideal for data cleaning, statistical summarization, and plotting | ||
because with `flytekit` you can leverage the rich Python ecosystem of data | ||
processing and visualization tools. | ||
|
||
## Cleaning Data | ||
|
||
In this example, we're going to analyze some covid vaccination data: | ||
|
||
```{code-cell} ipython3 | ||
import pandas as pd | ||
import plotly | ||
import plotly.graph_objects as go | ||
from flytekit import Deck, task, workflow, Resources | ||
@task(requests=Resources(mem="1Gi")) | ||
def clean_data() -> pd.DataFrame: | ||
"""Clean the dataset.""" | ||
df = pd.read_csv("https://covid.ourworldindata.org/data/owid-covid-data.csv") | ||
filled_df = ( | ||
df.sort_values(["people_vaccinated"], ascending=False) | ||
.groupby("location") | ||
.first() | ||
.reset_index() | ||
)[["location", "people_vaccinated", "population", "date"]] | ||
return filled_df | ||
``` | ||
|
||
As you can see, we're using `pandas` for data processing, and in the task | ||
below we use `plotly` to create a choropleth map of the percent of a country's | ||
population that has received at least one COVID-19 vaccination. | ||
|
||
## Rendering Plots | ||
|
||
We can use {ref}`Flyte Decks <decks>` for rendering a static HTML report | ||
of the map. In this case, we normalize the `people_vaccinated` by the | ||
`population` count of each country: | ||
|
||
```{code-cell} ipython3 | ||
@task(disable_deck=False) | ||
def plot(df: pd.DataFrame): | ||
"""Render a Choropleth map.""" | ||
df["text"] = df["location"] + "<br>" + "Last updated on: " + df["date"] | ||
fig = go.Figure( | ||
data=go.Choropleth( | ||
locations=df["location"], | ||
z=df["people_vaccinated"].astype(float) / df["population"].astype(float), | ||
text=df["text"], | ||
locationmode="country names", | ||
colorscale="Blues", | ||
autocolorscale=False, | ||
reversescale=False, | ||
marker_line_color="darkgray", | ||
marker_line_width=0.5, | ||
zmax=1, | ||
zmin=0, | ||
) | ||
) | ||
fig.update_layout( | ||
title_text=( | ||
"Percent population with at least one dose of COVID-19 vaccine" | ||
), | ||
geo_scope="world", | ||
geo=dict( | ||
showframe=False, showcoastlines=False, projection_type="equirectangular" | ||
), | ||
) | ||
Deck("Choropleth Map", plotly.io.to_html(fig)) | ||
@workflow | ||
def analytics_workflow(): | ||
"""Prepare a data analytics workflow.""" | ||
plot(df=clean_data()) | ||
``` | ||
|
||
Running this workflow, we get an interative plot, courtesy of `plotly`: | ||
|
||
```{code-cell} ipython3 | ||
--- | ||
tags: [remove-input] | ||
--- | ||
# this is an unrendered cell, used to capture the logs in order to render the | ||
# Flyte Decks directly in the docs. | ||
import logging | ||
import os | ||
import re | ||
from pythonjsonlogger import jsonlogger | ||
from IPython.display import HTML | ||
class DeckFilter(logging.Filter): | ||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
self.formatter = jsonlogger.JsonFormatter( | ||
fmt="%(asctime)s %(name)s %(levelname)s %(message)s" | ||
) | ||
self.logs = [] | ||
self.deck_files = {} | ||
def filter(self, record): | ||
patt = "(.+) task creates flyte deck html to (.+/deck.html)" | ||
msg = record.getMessage() | ||
matches = re.match(patt, msg) | ||
if msg == "Connection error. Skip stats collection.": | ||
return False | ||
if matches: | ||
task, filepath = matches.group(1), matches.group(2) | ||
self.logs.append(self.formatter.format(record)) | ||
self.deck_files[task] = re.sub("^file://", "", filepath) | ||
return False | ||
logger = logging.getLogger("flytekit") | ||
logger.setLevel(20) | ||
deck_filter = DeckFilter() | ||
logger.addFilter(deck_filter) | ||
``` | ||
|
||
```{code-cell} ipython3 | ||
analytics_workflow() | ||
``` | ||
|
||
```{code-cell} ipython3 | ||
--- | ||
tags: [remove-input] | ||
--- | ||
import os | ||
import shutil | ||
from pathlib import Path | ||
def cp_deck(src): | ||
src = Path(src) | ||
target = Path.cwd() / "_flyte_decks" / src.parent.name | ||
target.mkdir(parents=True, exist_ok=True) | ||
shutil.copy(src, target) | ||
return target / "deck.html" | ||
logger.removeFilter(deck_filter) | ||
HTML(filename=cp_deck(deck_filter.deck_files["plot"])) | ||
``` | ||
|
||
## Custom Flyte Deck Renderers | ||
|
||
You can also create your own {ref}`custom Flyte Deck renderers <getting_started_customer_renderers>` | ||
to visualize data with any plotting/visualization library of your choice, as | ||
long as you can render HTML for the objects of interest. | ||
|
||
```{important} | ||
Prefer other data processing frameworks? Flyte ships with | ||
[Polars](https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-polars), | ||
{ref}`Dask <plugins-dask-k8s>`, {ref}`Modin <modin-integration>`, {ref}`Spark <plugins-spark-k8s>`, | ||
[Vaex](https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-vaex), | ||
and [DBT](https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-dbt) | ||
integrations. | ||
If you need to connect to a database, Flyte provides first-party | ||
support for {ref}`AWS Athena <aws-athena>`, {ref}`Google Bigquery <big-query>`, | ||
{ref}`Snowflake <plugins-snowflake>`, {ref}`SQLAlchemy <sql_alchemy>`, and | ||
{ref}`SQLite3 <integrations_sql_sqlite3>`. | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
--- | ||
jupytext: | ||
formats: md:myst | ||
text_representation: | ||
extension: .md | ||
format_name: myst | ||
kernelspec: | ||
display_name: Python 3 | ||
language: python | ||
name: python3 | ||
--- | ||
|
||
(getting_started_data_engineering)= | ||
|
||
# Data Engineering | ||
|
||
Flyte is well-suited for data engineering use cases, where you can interleave | ||
SQL queries with data processing logic implemented in Python with whichever | ||
data processing tools you prefer. | ||
|
||
In this example, we create an ETL workflow that extracts data from a public | ||
[RNA database](https://rnacentral.org/help/public-database), performs some simple | ||
transforms on the data, and loads it into a CSV file. | ||
|
||
## Extract | ||
|
||
First, we define an `extract_task` task using the | ||
{ref}`flytekitplugins-sqlalchemy <sql_alchemy>` plugin, which provides an | ||
interface to perform SQL queries via the | ||
{py:class}`~flytekitplugins.sqlalchemy.SQLAlchemyTask` | ||
and {py:class}`~flytekitplugins.sqlalchemy.SQLAlchemyConfig` classes. | ||
|
||
```{code-cell} ipython3 | ||
import os | ||
import flytekit | ||
import pandas as pd | ||
from flytekit import Resources, kwtypes, task, workflow | ||
from flytekit.types.file import CSVFile | ||
from flytekitplugins.sqlalchemy import SQLAlchemyConfig, SQLAlchemyTask | ||
DATABASE_URI = ( | ||
"postgresql://reader:[email protected]:5432/pfmegrnargs" | ||
) | ||
extract_task = SQLAlchemyTask( | ||
"extract_rna", | ||
query_template=( | ||
"select len as sequence_length, timestamp from rna " | ||
"where len >= {{ .inputs.min_length }} and len <= {{ .inputs.max_length }} " | ||
"limit {{ .inputs.limit }}" | ||
), | ||
inputs=kwtypes(min_length=int, max_length=int, limit=int), | ||
output_schema_type=pd.DataFrame, | ||
task_config=SQLAlchemyConfig(uri=DATABASE_URI), | ||
) | ||
``` | ||
|
||
You can format the `query_template` with `{{ .inputs.<input_name> }}` to | ||
parameterize your query with the `input` keyword type specification, which | ||
maps task argument names to their expected types. | ||
|
||
```{important} | ||
You can request for access to secrets via the `secret_requests` of the | ||
{py:class}`~flytekitplugins.sqlalchemy.SQLAlchemyTask` constructor, then | ||
pass in a `secret_connect_args` argument to the | ||
{py:class}`~flytekitplugins.sqlalchemy.SQLAlchemyConfig` constructor, assuming | ||
that you have connection credentials available in the configured | ||
{ref}`Secrets Management System <secrets>`, which is K8s by default. | ||
``` | ||
|
||
## Transform | ||
|
||
Next, we parse the raw `timestamp`s and represent the time as separate `date` | ||
and `time` columns. Notice that we can encode the assumptions we have about this | ||
task's resource requirements with the {py:class}`~flytekit.Resources` object. | ||
If those assumptions ever change we can update the resource request here, or | ||
override it at the workflow-level with the {ref}`with_overrides <resource_with_overrides>` method. | ||
|
||
```{code-cell} ipython3 | ||
@task(requests=Resources(mem="700Mi")) | ||
def transform(df: pd.DataFrame) -> pd.DataFrame: | ||
"""Add date and time columns; drop timestamp column.""" | ||
timestamp = pd.to_datetime(df["timestamp"]) | ||
df["date"] = timestamp.dt.date | ||
df["time"] = timestamp.dt.time | ||
df.drop("timestamp", axis=1, inplace=True) | ||
return df | ||
``` | ||
|
||
## Load | ||
|
||
Finally, we load the transformed data into its final destination: a CSV file in | ||
blob storage. Flyte has a built-in `CSVFile` type that automatically handles | ||
serializing/deserializing and uploading/downloading the file as it's passed from | ||
one task to the next. All you need to do is write the file to some local location | ||
and pass that location to the `path` argument of `CSVFile`. | ||
|
||
```{code-cell} ipython3 | ||
@task(requests=Resources(mem="700Mi")) | ||
def load(df: pd.DataFrame) -> CSVFile: | ||
"""Load the dataframe to a csv file.""" | ||
csv_file = os.path.join(flytekit.current_context().working_directory, "rna_df.csv") | ||
df.to_csv(csv_file, index=False) | ||
return CSVFile(path=csv_file) | ||
``` | ||
|
||
## ETL Workflow | ||
|
||
Putting all the pieces together, we create an `etl_workflow` that produces a | ||
dataset based on the parameters you give it. | ||
|
||
```{code-cell} ipython3 | ||
@workflow | ||
def etl_workflow( | ||
min_length: int = 50, max_length: int = 200, limit: int = 10 | ||
) -> CSVFile: | ||
"""Build an extract, transform and load pipeline.""" | ||
return load( | ||
df=transform( | ||
df=extract_task(min_length=min_length, max_length=max_length, limit=limit) | ||
) | ||
) | ||
``` | ||
|
||
During local execution, this CSV file lives in a random local | ||
directory, but when the workflow is run on a Flyte cluster, this file lives in | ||
the configured blob store, like S3 or GCS. | ||
|
||
Running this workflow locally, we can access the CSV file and read it into | ||
a `pandas.DataFrame`. | ||
|
||
```{code-cell} ipython3 | ||
csv_file = etl_workflow(limit=5) | ||
pd.read_csv(csv_file) | ||
``` | ||
|
||
## Workflows as Reusable Components | ||
|
||
Because Flyte tasks and workflows are simply functions, we can embed | ||
`etl_workflow` as part of a larger workflow, where it's used to create a | ||
CSV file that's then consumed by downstream tasks or subworkflows: | ||
|
||
```{code-cell} ipython3 | ||
@task | ||
def aggregate(file: CSVFile) -> pd.DataFrame: | ||
data = pd.read_csv(file) | ||
... # process the data further | ||
@task | ||
def plot(data: pd.DataFrame): | ||
... # create a plot | ||
@workflow | ||
def downstream_workflow( | ||
min_length: int = 50, max_length: int = 200, limit: int = 10 | ||
): | ||
"""A downstream workflow that visualizes an aggregation of the data.""" | ||
csv_file = etl_workflow( | ||
min_length=min_length, | ||
max_length=max_length, | ||
limit=limit, | ||
) | ||
return plot(data=aggregate(file=csv_file)) | ||
``` | ||
|
||
```{important} | ||
Prefer other data processing frameworks? Flyte ships with | ||
[Polars](https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-polars), | ||
{ref}`Dask <plugins-dask-k8s>`, {ref}`Modin <modin-integration>`, {ref}`Spark <plugins-spark-k8s>`, | ||
[Vaex](https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-vaex), | ||
and [DBT](https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-dbt) | ||
integrations. | ||
For database connectors, Flyte provides first-party support for {ref}`AWS Athena <aws-athena>`, | ||
{ref}`Google Bigquery <big-query>`, {ref}`Snowflake <plugins-snowflake>`, | ||
{ref}`SQLAlchemy <sql_alchemy>`, and {ref}`SQLite3 <integrations_sql_sqlite3>`. | ||
``` |
Oops, something went wrong.