Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to create Flink Jobs in dataproc cluster #112

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ dependencies:
- google-cloud-dataflow-client>=0.8.6
- google-cloud-dataform>=0.5.0
- google-cloud-dataplex>=1.10.0
- google-cloud-dataproc>=5.8.0
- google-cloud-dataproc>=5.12.0
- google-cloud-dataproc-metastore>=1.12.0
- google-cloud-dlp>=3.12.0
- google-cloud-kms>=2.15.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ Submit a job to a cluster
-------------------------

Dataproc supports submitting jobs of different big data components.
The list currently includes Spark, Hadoop, Pig and Hive.
The list currently includes Spark, PySpark, Hadoop, Trino, Pig, Flink and Hive.
For more information on versions and images take a look at `Cloud Dataproc Image version list <https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-versions>`__

To submit a job to the cluster you need to provide a job source file. The job source file can be on GCS, the cluster or on your local
Expand Down Expand Up @@ -351,6 +351,14 @@ Example of the configuration for a Trino Job:
:start-after: [START how_to_cloud_dataproc_trino_config]
:end-before: [END how_to_cloud_dataproc_trino_config]

Example of the configuration for a Flink Job:

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py
:language: python
:dedent: 0
:start-after: [START how_to_cloud_dataproc_flink_config]
:end-before: [END how_to_cloud_dataproc_flink_config]

Working with workflows templates
--------------------------------

Expand Down
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@
"google-cloud-dataform>=0.5.0",
"google-cloud-dataplex>=1.10.0",
"google-cloud-dataproc-metastore>=1.12.0",
"google-cloud-dataproc>=5.8.0",
"google-cloud-dataproc>=5.12.0",
"google-cloud-dlp>=3.12.0",
"google-cloud-kms>=2.15.0",
"google-cloud-language>=2.9.0",
Expand Down
135 changes: 135 additions & 0 deletions tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for DataprocSubmitJobOperator with hadoop job.
"""

from __future__ import annotations

import os
from datetime import datetime

from google.api_core.retry import Retry

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocDeleteClusterOperator,
DataprocSubmitJobOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
DAG_ID = "dataproc_flink"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
CLUSTER_NAME_BASE = f"cluster-{DAG_ID}".replace("_", "-")
CLUSTER_NAME_FULL = CLUSTER_NAME_BASE + f"-{ENV_ID}".replace("_", "-")
CLUSTER_NAME = CLUSTER_NAME_BASE if len(CLUSTER_NAME_FULL) >= 33 else CLUSTER_NAME_FULL
REGION = "europe-west1"

OUTPUT_FOLDER = "wordcount"
OUTPUT_PATH = f"gs://{BUCKET_NAME}/{OUTPUT_FOLDER}/"

# Cluster definition
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"software_config": {"image_version": "2.2-debian12", "properties": {}, "optional_components": ["FLINK"]},
"worker_config": {
"num_instances": 3,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
}

# Jobs definitions
# [START how_to_cloud_dataproc_flink_config]
FLINK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"flink_job": {
"main_class": "org.apache.flink.examples.java.wordcount.WordCount",
"jar_file_uris": ["file:///usr/lib/flink/examples/batch/WordCount.jar"],
},
}
# [END how_to_cloud_dataproc_flink_config]


with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "dataproc", "hadoop"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)

create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)

flink_task = DataprocSubmitJobOperator(
task_id="hadoop_task", job=FLINK_JOB, region=REGION, project_id=PROJECT_ID
)

delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
trigger_rule=TriggerRule.ALL_DONE,
)

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

(
# TEST SETUP
[create_bucket, create_cluster]
# TEST BODY
>> flink_task
# TEST TEARDOWN
>> [delete_cluster, delete_bucket]
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "teardown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Loading