Skip to content

Commit

Permalink
Fix the system test: dataflow_native_java
Browse files Browse the repository at this point in the history
  • Loading branch information
molcay committed Sep 3, 2024
1 parent d6f820d commit 7181ba2
Showing 1 changed file with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,15 @@
PUBLIC_BUCKET = "airflow-system-tests-resources"

JAR_FILE_NAME = "word-count-beam-bundled-0.1.jar"
# For the distributed system, we need to store the JAR file in a folder that can be accessed by multiple
# worker.
# For example in Composer the correct path is gcs/data/word-count-beam-bundled-0.1.jar.
# Because gcs/data/ is shared folder for Airflow's workers.
IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", ""))
LOCAL_JAR = f"gcs/data/{JAR_FILE_NAME}" if IS_COMPOSER else JAR_FILE_NAME
REMOTE_JAR_FILE_PATH = f"dataflow/java/{JAR_FILE_NAME}"
GCS_OUTPUT = f"gs://{BUCKET_NAME}"
GCS_JAR = f"gs://{PUBLIC_BUCKET}/dataflow/java/{JAR_FILE_NAME}"
GCS_OUTPUT = f"gs://{BUCKET_NAME}"
LOCATION = "europe-west3"

with DAG(
Expand All @@ -70,13 +76,13 @@
task_id="download_file",
object_name=REMOTE_JAR_FILE_PATH,
bucket=PUBLIC_BUCKET,
filename=JAR_FILE_NAME,
filename=LOCAL_JAR,
)

# [START howto_operator_start_java_job_local_jar]
start_java_job_local = BeamRunJavaPipelineOperator(
task_id="start_java_job_local",
jar=JAR_FILE_NAME,
jar=LOCAL_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
Expand All @@ -92,31 +98,33 @@
# [START howto_operator_start_java_job_jar_on_gcs]
start_java_job = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start-java-job",
task_id="start_java_job",
jar=GCS_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"job_name": "test-java-pipeline-job",
"check_if_running": CheckJobRunning.IgnoreJob,
"location": LOCATION,
"poll_sleep": 10,
"append_job_name": False,
},
)
# [END howto_operator_start_java_job_jar_on_gcs]

# [START howto_operator_start_java_job_jar_on_gcs_deferrable]
start_java_deferrable = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start-java-job-deferrable",
task_id="start_java_job_deferrable",
jar=GCS_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"job_name": "test-java-pipeline-job",
"job_name": "test-deferrable-java-pipeline-job",
"check_if_running": CheckJobRunning.WaitForRun,
"location": LOCATION,
"poll_sleep": 10,
Expand Down

0 comments on commit 7181ba2

Please sign in to comment.