Skip to content

Commit

Permalink
Add ability to create Flink Jobs in dataproc cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
Ulada Zakharava committed Sep 18, 2024
1 parent c29f8dd commit 9607ac5
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 3 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ dependencies:
- google-cloud-dataflow-client>=0.8.6
- google-cloud-dataform>=0.5.0
- google-cloud-dataplex>=1.10.0
- google-cloud-dataproc>=5.8.0
- google-cloud-dataproc>=5.12.0
- google-cloud-dataproc-metastore>=1.12.0
- google-cloud-dlp>=3.12.0
- google-cloud-kms>=2.15.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ Submit a job to a cluster
-------------------------

Dataproc supports submitting jobs of different big data components.
The list currently includes Spark, Hadoop, Pig and Hive.
The list currently includes Spark, PySpark, Hadoop, Trino, Pig, Flink and Hive.
For more information on versions and images take a look at `Cloud Dataproc Image version list <https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-versions>`__

To submit a job to the cluster you need to provide a job source file. The job source file can be on GCS, the cluster or on your local
Expand Down Expand Up @@ -351,6 +351,14 @@ Example of the configuration for a Trino Job:
:start-after: [START how_to_cloud_dataproc_trino_config]
:end-before: [END how_to_cloud_dataproc_trino_config]

Example of the configuration for a Flink Job:

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py
:language: python
:dedent: 0
:start-after: [START how_to_cloud_dataproc_flink_config]
:end-before: [END how_to_cloud_dataproc_flink_config]

Working with workflows templates
--------------------------------

Expand Down
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@
"google-cloud-dataform>=0.5.0",
"google-cloud-dataplex>=1.10.0",
"google-cloud-dataproc-metastore>=1.12.0",
"google-cloud-dataproc>=5.8.0",
"google-cloud-dataproc>=5.12.0",
"google-cloud-dlp>=3.12.0",
"google-cloud-kms>=2.15.0",
"google-cloud-language>=2.9.0",
Expand Down
135 changes: 135 additions & 0 deletions tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for DataprocSubmitJobOperator with hadoop job.
"""

from __future__ import annotations

import os
from datetime import datetime

from google.api_core.retry import Retry

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocDeleteClusterOperator,
DataprocSubmitJobOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
DAG_ID = "dataproc_flink"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
CLUSTER_NAME_BASE = f"cluster-{DAG_ID}".replace("_", "-")
CLUSTER_NAME_FULL = CLUSTER_NAME_BASE + f"-{ENV_ID}".replace("_", "-")
CLUSTER_NAME = CLUSTER_NAME_BASE if len(CLUSTER_NAME_FULL) >= 33 else CLUSTER_NAME_FULL
REGION = "europe-west1"

OUTPUT_FOLDER = "wordcount"
OUTPUT_PATH = f"gs://{BUCKET_NAME}/{OUTPUT_FOLDER}/"

# Cluster definition
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"software_config": {"image_version": "2.2-debian12", "properties": {}, "optional_components": ["FLINK"]},
"worker_config": {
"num_instances": 3,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
}

# Jobs definitions
# [START how_to_cloud_dataproc_flink_config]
FLINK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"flink_job": {
"main_class": "org.apache.flink.examples.java.wordcount.WordCount",
"jar_file_uris": ["file:///usr/lib/flink/examples/batch/WordCount.jar"],
},
}
# [END how_to_cloud_dataproc_flink_config]


with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "dataproc", "hadoop"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)

create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)

flink_task = DataprocSubmitJobOperator(
task_id="hadoop_task", job=FLINK_JOB, region=REGION, project_id=PROJECT_ID
)

delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
trigger_rule=TriggerRule.ALL_DONE,
)

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

(
# TEST SETUP
[create_bucket, create_cluster]
# TEST BODY
>> flink_task
# TEST TEARDOWN
>> [delete_cluster, delete_bucket]
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "teardown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit 9607ac5

Please sign in to comment.