diff --git a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py index 9ef2efe3d7b20..df30a06e6f046 100644 --- a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py +++ b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py @@ -52,9 +52,15 @@ PUBLIC_BUCKET = "airflow-system-tests-resources" JAR_FILE_NAME = "word-count-beam-bundled-0.1.jar" +# For the distributed system, we need to store the JAR file in a folder that can be accessed by multiple +# worker. +# For example in Composer the correct path is gcs/data/word-count-beam-bundled-0.1.jar. +# Because gcs/data/ is shared folder for Airflow's workers. +IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", "")) +LOCAL_JAR = f"gcs/data/{JAR_FILE_NAME}" if IS_COMPOSER else JAR_FILE_NAME REMOTE_JAR_FILE_PATH = f"dataflow/java/{JAR_FILE_NAME}" -GCS_OUTPUT = f"gs://{BUCKET_NAME}" GCS_JAR = f"gs://{PUBLIC_BUCKET}/dataflow/java/{JAR_FILE_NAME}" +GCS_OUTPUT = f"gs://{BUCKET_NAME}" LOCATION = "europe-west3" with DAG( @@ -70,13 +76,13 @@ task_id="download_file", object_name=REMOTE_JAR_FILE_PATH, bucket=PUBLIC_BUCKET, - filename=JAR_FILE_NAME, + filename=LOCAL_JAR, ) # [START howto_operator_start_java_job_local_jar] start_java_job_local = BeamRunJavaPipelineOperator( task_id="start_java_job_local", - jar=JAR_FILE_NAME, + jar=LOCAL_JAR, pipeline_options={ "output": GCS_OUTPUT, }, @@ -92,16 +98,18 @@ # [START howto_operator_start_java_job_jar_on_gcs] start_java_job = BeamRunJavaPipelineOperator( runner=BeamRunnerType.DataflowRunner, - task_id="start-java-job", + task_id="start_java_job", jar=GCS_JAR, pipeline_options={ "output": GCS_OUTPUT, }, job_class="org.apache.beam.examples.WordCount", dataflow_config={ + "job_name": "test-java-pipeline-job", "check_if_running": CheckJobRunning.IgnoreJob, "location": LOCATION, "poll_sleep": 10, + "append_job_name": False, }, ) # [END howto_operator_start_java_job_jar_on_gcs] @@ -109,14 +117,14 @@ # [START howto_operator_start_java_job_jar_on_gcs_deferrable] start_java_deferrable = BeamRunJavaPipelineOperator( runner=BeamRunnerType.DataflowRunner, - task_id="start-java-job-deferrable", + task_id="start_java_job_deferrable", jar=GCS_JAR, pipeline_options={ "output": GCS_OUTPUT, }, job_class="org.apache.beam.examples.WordCount", dataflow_config={ - "job_name": "test-java-pipeline-job", + "job_name": "test-deferrable-java-pipeline-job", "check_if_running": CheckJobRunning.WaitForRun, "location": LOCATION, "poll_sleep": 10,