diff --git a/docs/deployment/agents/airflow.rst b/docs/deployment/agents/airflow.rst new file mode 100644 index 0000000000..ad6a6dab36 --- /dev/null +++ b/docs/deployment/agents/airflow.rst @@ -0,0 +1,97 @@ +.. _deployment-agent-setup-airflow: + +Airflow agent +================= + +This guide provides an overview of how to set up the Airflow agent in your Flyte deployment. +Please note that the you don't need an Airflow cluster to run the Airflow tasks, since Flytekit will +automatically compile Airflow tasks to Flyte tasks and execute them on the Flyte cluster. + +Specify agent configuration +---------------------------- + +.. tabs:: + + .. group-tab:: Flyte binary + + Edit the relevant YAML file to specify the agent. + + .. code-block:: bash + + kubectl edit configmap flyte-sandbox-config -n flyte + + .. code-block:: yaml + :emphasize-lines: 7,11,16 + + tasks: + task-plugins: + enabled-plugins: + - container + - sidecar + - k8s-array + - agent-service + default-for-task-types: + - container: container + - container_array: k8s-array + - airflow: agent-service + + plugins: + agent-service: + supportedTaskTypes: + - airflow + + .. group-tab:: Flyte core + + Create a file named ``values-override.yaml`` and add the following configuration to it. + + .. code-block:: yaml + + configmap: + enabled_plugins: + # -- Tasks specific configuration [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#GetConfig) + tasks: + # -- Plugins configuration, [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#TaskPluginConfig) + task-plugins: + # -- [Enabled Plugins](https://pkg.go.dev/github.com/flyteorg/flyteplugins/go/tasks/config#Config). Enable sagemaker*, athena if you install the backend + enabled-plugins: + - container + - sidecar + - k8s-array + - agent-service + default-for-task-types: + container: container + sidecar: sidecar + container_array: k8s-array + airflow: agent-service + plugins: + agent-service: + supportedTaskTypes: + - airflow + + +Upgrade the Flyte Helm release +------------------------------ + +.. tabs:: + + .. group-tab:: Flyte binary + + .. code-block:: bash + + helm upgrade flyteorg/flyte-binary -n --values + + Replace ```` with the name of your release (e.g., ``flyte-backend``), + ```` with the name of your namespace (e.g., ``flyte``), + and ```` with the name of your YAML file. + + .. group-tab:: Flyte core + + .. code-block:: bash + + helm upgrade flyte/flyte-core -n --values values-override.yaml + + Replace ```` with the name of your release (e.g., ``flyte``) + + and ```` with the name of your namespace (e.g., ``flyte``). + +For Airflow agent on the Flyte cluster, see `Airflow agent `_. diff --git a/docs/deployment/agents/bigquery.rst b/docs/deployment/agents/bigquery.rst index 9835c3d47a..d706ac7c37 100644 --- a/docs/deployment/agents/bigquery.rst +++ b/docs/deployment/agents/bigquery.rst @@ -1,6 +1,6 @@ .. _deployment-agent-setup-bigquery: -Google BigQuery Agent +Google BigQuery agent ====================== This guide provides an overview of setting up BigQuery agent in your Flyte deployment. @@ -103,4 +103,4 @@ Upgrade the Flyte Helm release and ```` with the name of your namespace (e.g., ``flyte``). -For BigQuery plugin on the Flyte cluster, please refer to `BigQuery Plugin Example `_ +For BigQuery agent on the Flyte cluster, see `BigQuery agent `_. diff --git a/docs/deployment/agents/databricks.rst b/docs/deployment/agents/databricks.rst index 00a5e97a47..3dbf7731c5 100644 --- a/docs/deployment/agents/databricks.rst +++ b/docs/deployment/agents/databricks.rst @@ -1,6 +1,6 @@ .. _deployment-agent-setup-databricks: -Databricks Agent +Databricks agent ================= This guide provides an overview of how to set up Databricks agent in your Flyte deployment. @@ -291,4 +291,4 @@ Wait for the upgrade to complete. You can check the status of the deployment pod kubectl get pods -n flyte -For databricks plugin on the Flyte cluster, please refer to `Databricks Plugin Example `_ +For Databricks agent on the Flyte cluster, see `Databricks agent `_. diff --git a/docs/deployment/agents/index.md b/docs/deployment/agents/index.md index e27644570a..0e114c8d06 100644 --- a/docs/deployment/agents/index.md +++ b/docs/deployment/agents/index.md @@ -2,22 +2,29 @@ # Agent Setup -.. tags:: Agent, Integration, Data, Advanced +```{tags} Agent, Integration, Data, Advanced +``` + +To set configure your Flyte deployment for agents, see the documentation below. -Discover the process of setting up Agents for Flyte. +:::{note} +If you are using a managed deployment of Flyte, you will need to contact your deployment administrator to configure agents in your deployment. +::: ```{list-table} :header-rows: 0 :widths: 20 30 -* - {ref}`Bigquery Agent ` - - Guide to setting up the Bigquery agent. +* - {ref}`Airflow Agent ` + - Configuring your Flyte deployment for the Airflow agent +* - {ref}`Databricks Agent ` + - Configuring your Flyte deployment for the Databricks agent. +* - {ref}`Google BigQuery Agent ` + - Configuring your Flyte deployment for the BigQuery agent. * - {ref}`MMCloud Agent ` - - Guide to setting up the MMCloud agent. + - Configuring your Flyte deployment for the MMCloud agent. * - {ref}`Sensor Agent ` - - Guide to setting up the Sensor agent. -* - {ref}`Databricks Agent ` - - Guide to setting up the Databricks agent. + - Configuring your Flyte deployment for the sensor agent. ``` ```{toctree} @@ -25,8 +32,10 @@ Discover the process of setting up Agents for Flyte. :name: Agent setup :hidden: +airflow +databricks bigquery mmcloud -databricks sensor +snowflake ``` diff --git a/docs/deployment/agents/mmcloud.rst b/docs/deployment/agents/mmcloud.rst index 217beab8ed..ac08f4fcdf 100644 --- a/docs/deployment/agents/mmcloud.rst +++ b/docs/deployment/agents/mmcloud.rst @@ -118,4 +118,4 @@ Wait for the upgrade to complete. You can check the status of the deployment pod kubectl get pods -n flyte -For MMCloud plugin on the Flyte cluster, please refer to `Memory Machine Cloud Plugin Example `_ +For MMCloud agent on the Flyte cluster, see `MMCloud agent `_. diff --git a/docs/deployment/agents/sensor.rst b/docs/deployment/agents/sensor.rst index ecb45e426f..958e5d896a 100644 --- a/docs/deployment/agents/sensor.rst +++ b/docs/deployment/agents/sensor.rst @@ -1,13 +1,13 @@ .. _deployment-agent-setup-sensor: -Sensor Agent +Sensor agent ================= -Sensor enables users to continuously check for a file or a condition to be met periodically. +The `sensor agent `_ enables users to continuously check for a file or a condition to be met periodically. When the condition is met, the sensor will complete. -This guide provides an overview of how to set up Sensor in your Flyte deployment. +This guide provides an overview of how to set up the sensor agent in your Flyte deployment. Spin up a cluster ----------------- @@ -43,7 +43,7 @@ Spin up a cluster Specify agent configuration ---------------------------- -Enable the Sensor agent by adding the following config to the relevant YAML file(s): +Enable the sensor agent by adding the following config to the relevant YAML file(s): .. tabs:: @@ -77,7 +77,7 @@ Enable the Sensor agent by adding the following config to the relevant YAML file .. group-tab:: Flyte core - Create a file named ``values-override.yaml`` and add the following configuration to it. + Create a file named ``values-override.yaml`` and add the following configuration to it: .. code-block:: yaml diff --git a/docs/deployment/agents/snowflake.rst b/docs/deployment/agents/snowflake.rst new file mode 100644 index 0000000000..f4d82c0eb2 --- /dev/null +++ b/docs/deployment/agents/snowflake.rst @@ -0,0 +1,103 @@ +.. _deployment-agent-setup-snowflake: + +Snowflake agent +================= + +This guide provides an overview of how to set up the Snowflake agent in your Flyte deployment. + +1. Set up the key pair authentication in Snowflake. For more details, see the `Snowflake key-pair authentication and key-pair rotation guide `__. +2. Create a secret with the group "snowflake" and the key "private_key". For more details, see `"Using Secrets in a Task" `__. + +.. code-block:: bash + + kubectl create secret generic snowflake-private-key --namespace=flytesnacks-development --from-file=your_private_key_above + +Specify agent configuration +---------------------------- + +.. tabs:: + + .. group-tab:: Flyte binary + + Edit the relevant YAML file to specify the agent. + + .. code-block:: bash + + kubectl edit configmap flyte-sandbox-config -n flyte + + .. code-block:: yaml + :emphasize-lines: 7,11,16 + + tasks: + task-plugins: + enabled-plugins: + - container + - sidecar + - k8s-array + - agent-service + default-for-task-types: + - container: container + - container_array: k8s-array + - snowflake: agent-service + + plugins: + agent-service: + supportedTaskTypes: + - snowflake + + .. group-tab:: Flyte core + + Create a file named ``values-override.yaml`` and add the following configuration to it. + + .. code-block:: yaml + + configmap: + enabled_plugins: + # -- Tasks specific configuration [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#GetConfig) + tasks: + # -- Plugins configuration, [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#TaskPluginConfig) + task-plugins: + # -- [Enabled Plugins](https://pkg.go.dev/github.com/flyteorg/flyteplugins/go/tasks/config#Config). Enable sagemaker*, athena if you install the backend + enabled-plugins: + - container + - sidecar + - k8s-array + - agent-service + default-for-task-types: + container: container + sidecar: sidecar + container_array: k8s-array + snowflake: agent-service + plugins: + agent-service: + supportedTaskTypes: + - snowflake + +Ensure that the propeller has the correct service account for BigQuery. + +Upgrade the Flyte Helm release +------------------------------ + +.. tabs:: + + .. group-tab:: Flyte binary + + .. code-block:: bash + + helm upgrade flyteorg/flyte-binary -n --values + + Replace ```` with the name of your release (e.g., ``flyte-backend``), + ```` with the name of your namespace (e.g., ``flyte``), + and ```` with the name of your YAML file. + + .. group-tab:: Flyte core + + .. code-block:: bash + + helm upgrade flyte/flyte-core -n --values values-override.yaml + + Replace ```` with the name of your release (e.g., ``flyte``) + + and ```` with the name of your namespace (e.g., ``flyte``). + +For Snowflake agent on the Flyte cluster, see `Snowflake agent `_. diff --git a/docs/flyte_agents/developing_agents.md b/docs/flyte_agents/developing_agents.md new file mode 100644 index 0000000000..bd7d1c7610 --- /dev/null +++ b/docs/flyte_agents/developing_agents.md @@ -0,0 +1,85 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +--- + +(developing_agents)= +# Developing agents + +The Flyte agent framework enables rapid agent development, since agents are decoupled from the core FlytePropeller engine. Rather than building a complete gRPC service from scratch, you can implement an agent as a Python class, easing development. Agents can be tested independently and deployed privately, making maintenance easier and giving you more flexibility and control over development. + +If you need to create a new type of task, we recommend creating a new agent to run it rather than running the task in a pod. After testing the new agent, you can update your FlytePropeller configMap to specify the type of task that the agent should run. + +There are two types of agents: **async** and **sync**. +* **Async agents** enable long-running jobs that execute on an external platform over time. They communicate with external services that have asynchronous APIs that support `create`, `get`, and `delete` operations. The vast majority of agents are async agents. +* **Sync agents** enable request/response services that return immediate outputs (e.g. calling an internal API to fetch data or communicating with the OpenAI API). + +```{note} + +While agents can be written in any programming language, we currently only support Python agents. We may support other languages in the future. + +``` + +## Async agent interface specification + +To create a new async agent, extend the `AgentBase` class in the `flytekit.backend` module and implement `create`, `get`, and `delete` methods. All calls must be idempotent. + +- `create`: This method is used to initiate a new job. Users have the flexibility to use gRPC, REST, or an SDK to create a job. +- `get`: This method retrieves the job resource (jobID or output literal) associated with the task, such as a BigQuery job ID or Databricks task ID. +- `delete`: Invoking this method will send a request to delete the corresponding job. + +```python +from flytekit.extend.backend.base_agent import AgentBase, AgentRegistry +from dataclasses import dataclass +import requests + +@dataclass +class Metadata: + # FlytePropeller will pass the metadata specified in this class to the agent. + # For example, if you add job_id to the metadata, the agent will use the job_id to get the job status. + # If you add s3 file path, the agent will check if the file exists. + job_id: str + +class CustomAsyncAgent(AsyncAgentBase): + def __init__(self, task_type: str): + # Each agent should have a unique task type. + # The Flyte agent service will use the task type + # to find the corresponding agent. + self._task_type = task_type + + def create( + self, + output_prefix: str, + task_template: TaskTemplate, + inputs: typing.Optional[LiteralMap] = None, + **kwargs, + ) -> TaskCreateResponse: + # 1. Submit the task to the external service (BigQuery, DataBricks, etc.) + # 2. Create metadata for the task, such as jobID. + # 3. Return the metadata, serialized to bytes. + res = requests.post(url, json=data) + return CreateTaskResponse(resource_meta=json.dumps(asdict(Metadata(job_id=str(res.job_id)))).encode("utf-8")) + + def get(self, resource_meta: bytes, **kwargs) -> TaskGetResponse: + # 1. Deserialize the metadata. + # 2. Use the metadata to get the job status. + # 3. Return the job status. + metadata = Metadata(**json.loads(resource_meta.decode("utf-8"))) + res = requests.get(url, json={"job_id": metadata.job_id}) + return GetTaskResponse(resource=Resource(state=res.state) + + def delete(self, resource_meta: bytes, **kwargs) -> TaskDeleteResponse: + # 1. Deserialize the metadata. + # 2. Use the metadata to delete the job. + metadata = Metadata(**json.loads(resource_meta.decode("utf-8"))) + requests.delete(url, json={"job_id": metadata.job_id}) + return DeleteTaskResponse() + +# To register the custom agent +AgentRegistry.register(CustomAsyncAgent()) +``` + +For an example implementation, see the [BigQuery agent](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L43). diff --git a/docs/flyte_agents/enabling_agents_in_your_flyte_deployment.md b/docs/flyte_agents/enabling_agents_in_your_flyte_deployment.md new file mode 100644 index 0000000000..f50b740a21 --- /dev/null +++ b/docs/flyte_agents/enabling_agents_in_your_flyte_deployment.md @@ -0,0 +1,16 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +--- + +(enabling_agents_in_your_flyte_deploymen)= +# Enabling agents in your Flyte deployment + +After you have finished {ref}`testing an agent locally `, you can enable the agent in your Flyte deployment to use it in production. To enable a particular agent in your Flyte deployment, see the [Agent setup guide](https://docs.flyte.org/en/latest/deployment/agents/index.html) for the agent. + +:::{note} +If you are using a managed deployment of Flyte, you will need to contact your deployment administrator to enable agents in your deployment. +::: diff --git a/docs/flyte_agents/index.md b/docs/flyte_agents/index.md new file mode 100644 index 0000000000..d56c7fed4b --- /dev/null +++ b/docs/flyte_agents/index.md @@ -0,0 +1,48 @@ +--- +# override the toc-determined page navigation order +prev-page: getting_started/extending_flyte +prev-page-title: Extending Flyte +--- + +(flyte_agents_guide)= +# Flyte agents + +Flyte agents are long-running, stateless services that receive execution requests via gRPC and initiate jobs with appropriate external or internal services. They enable two key workflows: asynchronously launching jobs on hosted platforms (e.g. Databbricks or Snowflake) and calling external synchronous services, such as access control, data retrieval, and model inferencing. + +Each agent service is a Kubernetes deployment that receives gRPC requests from FlytePropeller when users trigger a particular type of task (for example, the BigQuery agent handles BigQuery tasks). The agent service then initiates a job with the appropriate service. Since Agents can be spawned in process, they allow for running all services locally as long as the connection secrets are available. Moreover, Agents use a protobuf interface, thus can be implemented in any language, providing a lot of opportunity for flexibility and reuse of existing libraries, as well as simpler testing. + +You can create different agent services that host different agents, e.g., a production and a development agent service. + +:::{figure} https://i.ibb.co/vXhBDjP/Screen-Shot-2023-05-29-at-2-54-14-PM.png +:alt: Agent Service +:class: with-shadow +::: + +## Using agents in tasks + +If you need to connect to an external service in your workflow, we recommend using the corresponding agent rather than a web API plugin. Agents are designed to be scalable and can handle large workloads efficiently, and decrease load on FlytePropeller, since they run outside of it. You can also test agents locally without having to change the Flyte backend configuration, streamlining development. + +For a list of agents you can use in your tasks and example usage for each, see the [Integrations](https://docs.flyte.org/en/latest/flytesnacks/integrations.html#agents) documentation. + +## Table of contents + +```{list-table} +:header-rows: 0 +:widths: 20 30 + +* - {doc}`Developing agents ` + - If the agent you need doesn't exist, follow these steps to create it. +* - {doc}`Testing agents locally ` + - Whether using an existing agent or developing a new one, you can test the agent locally without needing to configure your Flyte deployment. +* - {doc}`Enabling agents in your Flyte deployment ` + - Once you have tested an agent locally and want to use it in production, you must configure your Flyte deployment for the agent. +``` + +```{toctree} +:maxdepth: -1 +:hidden: + +developing_agents +testing_agents_locally +enabling_agents_in_your_flyte_deployment +``` diff --git a/docs/flyte_agents/testing_agents_locally.md b/docs/flyte_agents/testing_agents_locally.md new file mode 100644 index 0000000000..7874d0bca1 --- /dev/null +++ b/docs/flyte_agents/testing_agents_locally.md @@ -0,0 +1,48 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +--- + +(testing_agents_locally)= +# Testing agents locally + +You can test agents locally without running the backend server, making agent development easier. + +To test an agent locally, create a class for the agent task that inherits from [AsyncAgentExecutorMixin](https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L155). This mixin can handle both asynchronous tasks and synchronous tasks and allows flytekit to mimic FlytePropeller's behavior in calling the agent. + +## BigQuery example + +To test the BigQuery example, copy the following code to a file called `wf.py`, modifying as needed. + +```{note} + +In some cases, you will need to store credentials in your local environment when testing locally. +For example, you need to set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable when running BigQuery tasks to test the BigQuery agent. + +``` + +```python +class BigQueryTask(AsyncAgentExecutorMixin, SQLTask[BigQueryConfig]): + def __init__(self, name: str, **kwargs): + ... + + +# Instantiate the task class. Flytekit will automatically call the agent +# to `create`, `get`, or `delete` the job. +bigquery_doge_coin = BigQueryTask( + name=f"bigquery.doge_coin", + inputs=kwtypes(version=int), + query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;", + output_structured_dataset_type=StructuredDataset, + task_config=BigQueryConfig(ProjectID="flyte-test-340607") +) +``` + +You can run the above example task locally and test the agent with the following command: + +```bash +pyflyte run wf.py bigquery_doge_coin --version 10 +``` diff --git a/docs/index.md b/docs/index.md index 3a8d38e6ba..f4bb847096 100644 --- a/docs/index.md +++ b/docs/index.md @@ -138,6 +138,7 @@ Introduction Quickstart guide Getting started with workflow development Flyte fundamentals +Flyte agents Core use cases ```