diff --git a/providers/src/airflow/providers/google/cloud/hooks/dataproc.py b/providers/src/airflow/providers/google/cloud/hooks/dataproc.py index 456fd13858f5c..b970e212691a2 100644 --- a/providers/src/airflow/providers/google/cloud/hooks/dataproc.py +++ b/providers/src/airflow/providers/google/cloud/hooks/dataproc.py @@ -22,6 +22,9 @@ import time import uuid from collections.abc import MutableSequence +import shlex +import subprocess +import re from typing import TYPE_CHECKING, Any, Sequence from google.api_core.client_options import ClientOptions @@ -274,6 +277,53 @@ def get_operations_client(self, region: str | None): """Create a OperationsClient.""" return self.get_batch_client(region=region).transport.operations_client + def dataflow_options_to_args(self, options: dict) -> list[str]: + """ + Return a formatted cluster parameters from a dictionary of arguments. + + :param options: Dictionary with options + :return: List of arguments + """ + if not options: + return [] + + args: list[str] = [] + for attr, value in options.items(): + self.log.info("Attribute: %s, value: %s", attr, value) + if value is None or (isinstance(value, bool) and value): + args.append(f"--{attr}") + elif isinstance(value, bool) and not value: + continue + elif isinstance(value, list): + args.extend([f"--{attr}={v}" for v in value]) + else: + args.append(f"--{attr}={value}") + self.log.info("----------------------------------SOME options: %s", args) + return args + + def _build_gcloud_command(self, command: list[str], parameters: dict[str, str]) -> list[str]: + return [*command, *(self.dataflow_options_to_args(parameters))] + + def _create_dataflow_cluster_with_gcloud(self, cmd: list[str]) -> str: + """Create a Dataflow cluster with a gcloud command and return the job's ID.""" + self.log.info("Executing command: %s", " ".join(shlex.quote(c) for c in cmd)) + success_code = 0 + + with self.provide_authorized_gcloud(): + proc = subprocess.run(cmd, capture_output=True) + + if proc.returncode != success_code: + stderr_last_20_lines = "\n".join(proc.stderr.decode().strip().splitlines()[-20:]) + raise AirflowException( + f"Process exit with non-zero exit code. Exit code: {proc.returncode}. Error Details : " + f"{stderr_last_20_lines}" + ) + + response = proc.stdout.decode().strip() + self.log.info("----------SOME RESPONSE AFTER RUNNING COMMAND: %s", response) + + return response + def wait_for_operation( self, operation: Operation, @@ -302,7 +352,7 @@ def create_cluster( retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), - ) -> Operation: + ) -> Operation | str: """ Create a cluster in a specified project. @@ -339,6 +389,30 @@ def create_cluster( "project_id": project_id, "cluster_name": cluster_name, } + # if virtual_cluster_config is kubernetes_cluster_config, then run gcloud command + if "kubernetes_cluster_config" in virtual_cluster_config: + self.log.info("--------some config from virtual: %s", virtual_cluster_config["kubernetes_cluster_config"]) + kube_config = virtual_cluster_config["kubernetes_cluster_config"]["gke_cluster_config"] + gke_cluster_name = kube_config["gke_cluster_target"].rsplit('/', 1)[1] + gke_pools = kube_config["node_pool_target"][0] + gke_pool_name = gke_pools["node_pool"].rsplit('/', 1)[1] + gke_pool_role = gke_pools["roles"][0] + gke_pool_machine_type = gke_pools["node_pool_config"]["config"]["machine_type"] + gcp_flags = { + "region": region, + "gke-cluster": gke_cluster_name, + "spark-engine-version": "3", + "pools": f"name={gke_pool_name},roles={gke_pool_role.lower()},machineType={gke_pool_machine_type},min=1,max=10", + "setup-workload-identity": None, + } + self.log.info("----------------GKE FLAGS: %s", gcp_flags) + cmd = self._build_gcloud_command( + command=["gcloud", "dataproc", "clusters", "gke", "create", cluster_name], parameters=gcp_flags + ) + response = self._create_dataflow_cluster_with_gcloud(cmd=cmd) + self.log.info("AGAIN SOME RESPONSE IN CREATE_CLUSTER: %s, i think executed successfully", response) + return response + if virtual_cluster_config is not None: cluster["virtual_cluster_config"] = virtual_cluster_config # type: ignore if cluster_config is not None: diff --git a/providers/src/airflow/providers/google/cloud/operators/dataproc.py b/providers/src/airflow/providers/google/cloud/operators/dataproc.py index 13be5b240cfc9..54720a92cb3f8 100644 --- a/providers/src/airflow/providers/google/cloud/operators/dataproc.py +++ b/providers/src/airflow/providers/google/cloud/operators/dataproc.py @@ -800,6 +800,7 @@ def _retry_cluster_creation(self, hook: DataprocHook): self._wait_for_cluster_in_deleting_state(hook) self.log.info("Starting a new creation for Cluster %s", self.cluster_name) operation = self._create_cluster(hook) + self.log.info("coming back from creation 2") cluster = hook.wait_for_operation(timeout=self.timeout, result_retry=self.retry, operation=operation) self.log.info("Cluster created.") return Cluster.to_dict(cluster) @@ -822,7 +823,9 @@ def execute(self, context: Context) -> dict: try: # First try to create a new cluster operation = self._create_cluster(hook) - if not self.deferrable: + self.log.info("coming back from creation 1, %s", type(operation)) + if not self.deferrable and type(operation) is not str: + self.log.info("inside if") cluster = hook.wait_for_operation( timeout=self.timeout, result_retry=self.retry, operation=operation )