diff --git a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py index dc8449cb316eb..346ba76db004b 100644 --- a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py +++ b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py @@ -24,27 +24,24 @@ import os from datetime import datetime -from pathlib import Path from airflow.models.dag import DAG from airflow.providers.apache.beam.hooks.beam import BeamRunnerType from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator from airflow.providers.google.cloud.operators.dataflow import DataflowStopJobOperator from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator -from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator from airflow.utils.trigger_rule import TriggerRule ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "dataflow_native_python" +RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" -PYTHON_FILE_NAME = "wordcount_debugging.py" GCS_TMP = f"gs://{BUCKET_NAME}/temp/" GCS_STAGING = f"gs://{BUCKET_NAME}/staging/" GCS_OUTPUT = f"gs://{BUCKET_NAME}/output" -GCS_PYTHON_SCRIPT = f"gs://{BUCKET_NAME}/{PYTHON_FILE_NAME}" -PYTHON_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / PYTHON_FILE_NAME) +GCS_PYTHON_SCRIPT = f"gs://{RESOURCE_DATA_BUCKET}/dataflow/python/wordcount_debugging.py" LOCATION = "europe-west3" default_args = { @@ -64,13 +61,6 @@ ) as dag: create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME) - upload_file = LocalFilesystemToGCSOperator( - task_id="upload_file_to_bucket", - src=PYTHON_FILE_LOCAL_PATH, - dst=PYTHON_FILE_NAME, - bucket=BUCKET_NAME, - ) - # [START howto_operator_start_python_job] start_python_job = BeamRunPythonPipelineOperator( runner=BeamRunnerType.DataflowRunner, @@ -80,10 +70,10 @@ pipeline_options={ "output": GCS_OUTPUT, }, - py_requirements=["apache-beam[gcp]==2.46.0"], + py_requirements=["apache-beam[gcp]==2.47.0"], py_interpreter="python3", py_system_site_packages=False, - dataflow_config={"location": LOCATION}, + dataflow_config={"location": LOCATION, "job_name": "start_python_job"}, ) # [END howto_operator_start_python_job] @@ -94,7 +84,7 @@ pipeline_options={ "output": GCS_OUTPUT, }, - py_requirements=["apache-beam[gcp]==2.46.0"], + py_requirements=["apache-beam[gcp]==2.47.0"], py_interpreter="python3", py_system_site_packages=False, ) @@ -114,7 +104,6 @@ ( # TEST SETUP create_bucket - >> upload_file # TEST BODY >> start_python_job >> start_python_job_local diff --git a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py index 01a7bce74ccc7..c7c2e62e76130 100644 --- a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py +++ b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py @@ -24,7 +24,6 @@ import os from datetime import datetime -from pathlib import Path from typing import Callable from airflow.exceptions import AirflowException @@ -39,20 +38,18 @@ DataflowJobMetricsSensor, DataflowJobStatusSensor, ) -from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator from airflow.utils.trigger_rule import TriggerRule ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "dataflow_native_python_async" +RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" -PYTHON_FILE_NAME = "wordcount_debugging.txt" GCS_TMP = f"gs://{BUCKET_NAME}/temp/" GCS_STAGING = f"gs://{BUCKET_NAME}/staging/" GCS_OUTPUT = f"gs://{BUCKET_NAME}/output" -GCS_PYTHON_SCRIPT = f"gs://{BUCKET_NAME}/{PYTHON_FILE_NAME}" -PYTHON_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / PYTHON_FILE_NAME) +GCS_PYTHON_SCRIPT = f"gs://{RESOURCE_DATA_BUCKET}/dataflow/python/wordcount_debugging.py" LOCATION = "europe-west3" default_args = { @@ -72,13 +69,6 @@ ) as dag: create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME) - upload_file = LocalFilesystemToGCSOperator( - task_id="upload_file_to_bucket", - src=PYTHON_FILE_LOCAL_PATH, - dst=PYTHON_FILE_NAME, - bucket=BUCKET_NAME, - ) - # [START howto_operator_start_python_job_async] start_python_job_async = BeamRunPythonPipelineOperator( task_id="start_python_job_async", @@ -88,7 +78,7 @@ pipeline_options={ "output": GCS_OUTPUT, }, - py_requirements=["apache-beam[gcp]==2.46.0"], + py_requirements=["apache-beam[gcp]==2.47.0"], py_interpreter="python3", py_system_site_packages=False, dataflow_config={ @@ -174,7 +164,6 @@ def check_autoscaling_event(autoscaling_events: list[dict]) -> bool: ( # TEST SETUP create_bucket - >> upload_file # TEST BODY >> start_python_job_async >> [ diff --git a/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py index 63e5c780e3ee6..a858ad90aa939 100644 --- a/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py +++ b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py @@ -82,10 +82,10 @@ "output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}", "streaming": True, }, - py_requirements=["apache-beam[gcp]==2.46.0"], + py_requirements=["apache-beam[gcp]==2.47.0"], py_interpreter="python3", py_system_site_packages=False, - dataflow_config={"location": LOCATION}, + dataflow_config={"location": LOCATION, "job_name": "start_python_job_streaming"}, ) # [END howto_operator_start_streaming_python_job] diff --git a/tests/system/providers/google/cloud/dataflow/resources/wordcount_debugging.txt b/tests/system/providers/google/cloud/dataflow/resources/wordcount_debugging.txt deleted file mode 100644 index efe7d205c495d..0000000000000 --- a/tests/system/providers/google/cloud/dataflow/resources/wordcount_debugging.txt +++ /dev/null @@ -1,168 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -An example that verifies the counts and includes best practices. - -On top of the basic concepts in the wordcount example, this workflow introduces -logging to Cloud Logging, and using assertions in a Dataflow pipeline. -To execute this pipeline locally, specify a local output file or output prefix -on GCS:: - - --output [YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] - -To execute this pipeline using the Google Cloud Dataflow service, specify -pipeline configuration:: - - --project YOUR_PROJECT_ID - --staging_location gs://YOUR_STAGING_DIRECTORY - --temp_location gs://YOUR_TEMP_DIRECTORY - --region GCE_REGION - --job_name YOUR_JOB_NAME - --runner DataflowRunner - -and an output prefix on GCS. -""" - -# pytype: skip-file - -from __future__ import annotations - -import argparse -import logging -import re - -import apache_beam as beam -from apache_beam.io import ReadFromText, WriteToText -from apache_beam.metrics import Metrics -from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions -from apache_beam.testing.util import assert_that, equal_to - - -class FilterTextFn(beam.DoFn): - """A DoFn that filters for a specific key based on a regular expression.""" - - def __init__(self, pattern): - # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3. - # super(FilterTextFn, self).__init__() - beam.DoFn.__init__(self) - self.pattern = pattern - # A custom metric can track values in your pipeline as it runs. Those - # values will be available in the monitoring system of the runner used - # to run the pipeline. These metrics below track the number of - # matched and unmatched words. - self.matched_words = Metrics.counter(self.__class__, "matched_words") - self.umatched_words = Metrics.counter(self.__class__, "umatched_words") - - def process(self, element): - word, _ = element - if re.match(self.pattern, word): - # Log at INFO level each element we match. When executing this pipeline - # using the Dataflow service, these log lines will appear in the Cloud - # Logging UI. - logging.info("Matched %s", word) - self.matched_words.inc() - yield element - else: - # Log at the "DEBUG" level each element that is not matched. Different log - # levels can be used to control the verbosity of logging providing an - # effective mechanism to filter less important information. - # Note currently only "INFO" and higher level logs are emitted to the - # Cloud Logger. This log message will not be visible in the Cloud Logger. - logging.debug("Did not match %s", word) - self.umatched_words.inc() - - -class CountWords(beam.PTransform): - """ - A transform to count the occurrences of each word. - - A PTransform that converts a PCollection containing lines of text into a - PCollection of (word, count) tuples. - """ - - def expand(self, pcoll): - def count_ones(word_ones): - (word, ones) = word_ones - return (word, sum(ones)) - - return ( - pcoll - | "split" >> (beam.FlatMap(lambda x: re.findall(r"[A-Za-z\']+", x)).with_output_types(str)) - | "pair_with_one" >> beam.Map(lambda x: (x, 1)) - | "group" >> beam.GroupByKey() - | "count" >> beam.Map(count_ones) - ) - - -def run(argv=None, save_main_session=True): - """Run the debugging wordcount pipeline.""" - parser = argparse.ArgumentParser() - parser.add_argument( - "--input", - dest="input", - default="gs://dataflow-samples/shakespeare/kinglear.txt", - help="Input file to process.", - ) - parser.add_argument("--output", dest="output", required=True, help="Output file to write results to.") - known_args, pipeline_args = parser.parse_known_args(argv) - # We use the save_main_session option because one or more DoFn's in this - # workflow rely on global context (e.g., a module imported at module level). - pipeline_options = PipelineOptions(pipeline_args) - pipeline_options.view_as(SetupOptions).save_main_session = save_main_session - with beam.Pipeline(options=pipeline_options) as p: - - # Read the text file[pattern] into a PCollection, count the occurrences of - # each word and filter by a list of words. - filtered_words = ( - p - | "read" >> ReadFromText(known_args.input) - | CountWords() - | "FilterText" >> beam.ParDo(FilterTextFn("Flourish|stomach")) - ) - - # assert_that is a convenient PTransform that checks a PCollection has an - # expected value. Asserts are best used in unit tests with small data sets - # but is demonstrated here as a teaching tool. - # - # Note assert_that does not provide any output and that successful - # completion of the Pipeline implies that the expectations were met. Learn - # more at https://cloud.google.com/dataflow/pipelines/testing-your-pipeline - # on how to best test your pipeline. - assert_that(filtered_words, equal_to([("Flourish", 3), ("stomach", 1)])) - - # Format the counts into a PCollection of strings and write the output using - # a "Write" transform that has side effects. - # pylint: disable=unused-variable - def format_result(word_count): - (word, count) = word_count - return f"{word}: {count}" - - _ = filtered_words | "format" >> beam.Map(format_result) | "write" >> WriteToText(known_args.output) - - -if __name__ == "__main__": - # Cloud Logging would contain only logging.INFO and higher level logs logged - # by the root logger. All log statements emitted by the root logger will be - # visible in the Cloud Logging UI. Learn more at - # https://cloud.google.com/logging about the Cloud Logging UI. - # - # You can set the default logging level to a different level when running - # locally. - logging.getLogger().setLevel(logging.INFO) - run() diff --git a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py index e7f5d7d920acd..8ad202f292a81 100644 --- a/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py +++ b/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.py @@ -100,10 +100,8 @@ "data_item_labels": { "test-labels-name": "test-labels-value", }, - "import_schema_uri": ( - "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml" - ), - "gcs_source": {"uris": ["gs://cloud-samples-data/vision/salads.csv"]}, + "import_schema_uri": "image_classification_single_label_io_format_1.0.0.yaml", + "gcs_source": {"uris": [f"gs://{DATA_SAMPLE_GCS_BUCKET_NAME}/vertex-ai/image-dataset-flowers.csv"]}, }, ] DATASET_TO_UPDATE = {"display_name": "test-name"} diff --git a/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py b/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py index a72c296d904a3..dfef99c465ffc 100644 --- a/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py +++ b/tests/system/providers/google/cloud/vision/example_vision_annotate_image.py @@ -71,7 +71,7 @@ # Public bucket holding the sample data BUCKET_NAME_SRC = "cloud-samples-data" # Path to the data inside the public bucket -PATH_SRC = "vision/ocr/sign.jpg" +PATH_SRC = "vision/logo/google_logo.jpg" with DAG(