From 9607ac52f11a504ad464829662d2ccd1250cd1ba Mon Sep 17 00:00:00 2001 From: Ulada Zakharava Date: Wed, 18 Sep 2024 14:19:40 +0000 Subject: [PATCH] Add ability to create Flink Jobs in dataproc cluster --- airflow/providers/google/provider.yaml | 2 +- .../operators/cloud/dataproc.rst | 10 +- generated/provider_dependencies.json | 2 +- .../cloud/dataproc/example_dataproc_flink.py | 135 ++++++++++++++++++ 4 files changed, 146 insertions(+), 3 deletions(-) create mode 100644 tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index d69b3f8fd6e20..9bc36e0813c38 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -125,7 +125,7 @@ dependencies: - google-cloud-dataflow-client>=0.8.6 - google-cloud-dataform>=0.5.0 - google-cloud-dataplex>=1.10.0 - - google-cloud-dataproc>=5.8.0 + - google-cloud-dataproc>=5.12.0 - google-cloud-dataproc-metastore>=1.12.0 - google-cloud-dlp>=3.12.0 - google-cloud-kms>=2.15.0 diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst index ae215c8a77b8f..1f7bac8566caf 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/dataproc.rst @@ -249,7 +249,7 @@ Submit a job to a cluster ------------------------- Dataproc supports submitting jobs of different big data components. -The list currently includes Spark, Hadoop, Pig and Hive. +The list currently includes Spark, PySpark, Hadoop, Trino, Pig, Flink and Hive. For more information on versions and images take a look at `Cloud Dataproc Image version list `__ To submit a job to the cluster you need to provide a job source file. The job source file can be on GCS, the cluster or on your local @@ -351,6 +351,14 @@ Example of the configuration for a Trino Job: :start-after: [START how_to_cloud_dataproc_trino_config] :end-before: [END how_to_cloud_dataproc_trino_config] +Example of the configuration for a Flink Job: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py + :language: python + :dedent: 0 + :start-after: [START how_to_cloud_dataproc_flink_config] + :end-before: [END how_to_cloud_dataproc_flink_config] + Working with workflows templates -------------------------------- diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 0023c18cd0c08..ae5cfa70d16cd 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -645,7 +645,7 @@ "google-cloud-dataform>=0.5.0", "google-cloud-dataplex>=1.10.0", "google-cloud-dataproc-metastore>=1.12.0", - "google-cloud-dataproc>=5.8.0", + "google-cloud-dataproc>=5.12.0", "google-cloud-dlp>=3.12.0", "google-cloud-kms>=2.15.0", "google-cloud-language>=2.9.0", diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py new file mode 100644 index 0000000000000..71b88325f6b86 --- /dev/null +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py @@ -0,0 +1,135 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG for DataprocSubmitJobOperator with hadoop job. +""" + +from __future__ import annotations + +import os +from datetime import datetime + +from google.api_core.retry import Retry + +from airflow.models.dag import DAG +from airflow.providers.google.cloud.operators.dataproc import ( + DataprocCreateClusterOperator, + DataprocDeleteClusterOperator, + DataprocSubmitJobOperator, +) +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator +from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") +DAG_ID = "dataproc_flink" +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID + +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +CLUSTER_NAME_BASE = f"cluster-{DAG_ID}".replace("_", "-") +CLUSTER_NAME_FULL = CLUSTER_NAME_BASE + f"-{ENV_ID}".replace("_", "-") +CLUSTER_NAME = CLUSTER_NAME_BASE if len(CLUSTER_NAME_FULL) >= 33 else CLUSTER_NAME_FULL +REGION = "europe-west1" + +OUTPUT_FOLDER = "wordcount" +OUTPUT_PATH = f"gs://{BUCKET_NAME}/{OUTPUT_FOLDER}/" + +# Cluster definition +CLUSTER_CONFIG = { + "master_config": { + "num_instances": 1, + "machine_type_uri": "n1-standard-4", + "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32}, + }, + "software_config": {"image_version": "2.2-debian12", "properties": {}, "optional_components": ["FLINK"]}, + "worker_config": { + "num_instances": 3, + "machine_type_uri": "n1-standard-4", + "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32}, + }, +} + +# Jobs definitions +# [START how_to_cloud_dataproc_flink_config] +FLINK_JOB = { + "reference": {"project_id": PROJECT_ID}, + "placement": {"cluster_name": CLUSTER_NAME}, + "flink_job": { + "main_class": "org.apache.flink.examples.java.wordcount.WordCount", + "jar_file_uris": ["file:///usr/lib/flink/examples/batch/WordCount.jar"], + }, +} +# [END how_to_cloud_dataproc_flink_config] + + +with DAG( + DAG_ID, + schedule="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "dataproc", "hadoop"], +) as dag: + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID + ) + + create_cluster = DataprocCreateClusterOperator( + task_id="create_cluster", + project_id=PROJECT_ID, + cluster_config=CLUSTER_CONFIG, + region=REGION, + cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), + ) + + flink_task = DataprocSubmitJobOperator( + task_id="hadoop_task", job=FLINK_JOB, region=REGION, project_id=PROJECT_ID + ) + + delete_cluster = DataprocDeleteClusterOperator( + task_id="delete_cluster", + project_id=PROJECT_ID, + cluster_name=CLUSTER_NAME, + region=REGION, + trigger_rule=TriggerRule.ALL_DONE, + ) + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE + ) + + ( + # TEST SETUP + [create_bucket, create_cluster] + # TEST BODY + >> flink_task + # TEST TEARDOWN + >> [delete_cluster, delete_bucket] + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "teardown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)