Skip to content

Commit

Permalink
Move self.build resetting from __init__ to execute method
Browse files Browse the repository at this point in the history
  • Loading branch information
e-galan committed Jul 31, 2024
1 parent 3905772 commit d8fbaf7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
9 changes: 5 additions & 4 deletions airflow/providers/google/cloud/operators/cloud_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,19 +201,20 @@ def __init__(

def prepare_template(self) -> None:
# if no file is specified, skip
if not isinstance(self.build_raw, str):
if not isinstance(self.build, str):
return
with open(self.build_raw) as file:
if self.build_raw.endswith((".yaml", ".yml")):
with open(self.build) as file:
if self.build.endswith((".yaml", ".yml")):
self.build = yaml.safe_load(file.read())
if self.build_raw.endswith(".json"):
elif self.build.endswith(".json"):
self.build = json.loads(file.read())

def execute(self, context: Context):
hook = CloudBuildHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)
self.prepare_template()
build = BuildProcessor(build=self.build).process_body()

self.cloud_build_operation, self.id_ = hook.create_build_without_waiting_for_result(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,47 +15,45 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that displays interactions with Google Cloud Build.
"""
"""Example Airflow DAG that displays interactions with Google Cloud Build."""

from __future__ import annotations

import os
from datetime import datetime
from pathlib import Path
from typing import Any, cast

import yaml

from airflow.decorators import task_group
from airflow.models.dag import DAG
from airflow.models.xcom_arg import XComArg
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.cloud_build import (
CloudBuildCancelBuildOperator,
CloudBuildCreateBuildOperator,
CloudBuildGetBuildOperator,
CloudBuildListBuildsOperator,
CloudBuildRetryBuildOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID

DAG_ID = "example_gcp_cloud_build"
DAG_ID = "gcp_cloud_build"

PUBLIC_BUCKET = "airflow-system-tests-resources"
GCP_SOURCE_ARCHIVE_URL = "gs://airflow-system-tests-resources/cloud-build/file.tar.gz"
REMOTE_YAML_FILE_OBJECT = "cloud-build/example-cloud-build.yaml"
LOCAL_YAML_FILE_NAME = "example_cloud_build.yaml"
# Repository with this name is expected created within the project $SYSTEM_TESTS_GCP_PROJECT
# If you'd like to run this system test locally, please
# 1. Create Cloud Source Repository
# 2. Push into a master branch the following file:
# tests/system/providers/google/cloud/cloud_build/resources/example_cloud_build.yaml
GCP_SOURCE_REPOSITORY_NAME = "test-cloud-build-repo"

CURRENT_FOLDER = Path(__file__).parent

# [START howto_operator_gcp_create_build_from_storage_body]
CREATE_BUILD_FROM_STORAGE_BODY = {
"source": {"storage_source": GCP_SOURCE_ARCHIVE_URL},
Expand All @@ -76,8 +74,19 @@
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
tags=["example", "cloud", "build"],
) as dag:
download_yaml_file = GCSToLocalFilesystemOperator(
task_id="download_yaml_file",
object_name=REMOTE_YAML_FILE_OBJECT,
bucket=PUBLIC_BUCKET,
filename=LOCAL_YAML_FILE_NAME,
)

get_yaml_file_path = PythonOperator(
task_id="get_yaml_file_path",
python_callable=lambda: LOCAL_YAML_FILE_NAME,
)

@task_group(group_id="build_from_storage")
def build_from_storage():
Expand Down Expand Up @@ -157,7 +166,7 @@ def build_from_repo_deferrable():
create_build_from_file = CloudBuildCreateBuildOperator(
task_id="create_build_from_file",
project_id=PROJECT_ID,
build=yaml.safe_load((Path(CURRENT_FOLDER) / "resources" / "example_cloud_build.yaml").read_text()),
build=cast(str, XComArg(get_yaml_file_path, key="return_value")),
params={"name": "Airflow"},
)
# [END howto_operator_gcp_create_build_from_yaml_body]
Expand All @@ -166,7 +175,7 @@ def build_from_repo_deferrable():
create_build_from_file_deferrable = CloudBuildCreateBuildOperator(
task_id="create_build_from_file_deferrable",
project_id=PROJECT_ID,
build=yaml.safe_load((Path(CURRENT_FOLDER) / "resources" / "example_cloud_build.yaml").read_text()),
build=cast(str, XComArg(get_yaml_file_path, key="return_value")),
params={"name": "Airflow"},
deferrable=True,
)
Expand Down Expand Up @@ -249,9 +258,12 @@ def no_wait_cancel_retry_get_deferrable():

create_build_without_wait >> cancel_build >> retry_build >> get_build

# TEST BODY
(
[
# TEST SETUP
download_yaml_file
>> get_yaml_file_path
# TEST BODY
>> [
build_from_storage(),
build_from_storage_deferrable(),
build_from_repo(),
Expand Down

0 comments on commit d8fbaf7

Please sign in to comment.