Skip to content

Commit

Permalink
creation works, need unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Ulada Zakharava committed Nov 4, 2024
1 parent 19ddb02 commit 1e7dcf9
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 2 deletions.
76 changes: 75 additions & 1 deletion providers/src/airflow/providers/google/cloud/hooks/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import time
import uuid
from collections.abc import MutableSequence
import shlex
import subprocess
import re
from typing import TYPE_CHECKING, Any, Sequence

from google.api_core.client_options import ClientOptions
Expand Down Expand Up @@ -274,6 +277,53 @@ def get_operations_client(self, region: str | None):
"""Create a OperationsClient."""
return self.get_batch_client(region=region).transport.operations_client

def dataflow_options_to_args(self, options: dict) -> list[str]:
"""
Return a formatted cluster parameters from a dictionary of arguments.
:param options: Dictionary with options
:return: List of arguments
"""
if not options:
return []

args: list[str] = []
for attr, value in options.items():
self.log.info("Attribute: %s, value: %s", attr, value)
if value is None or (isinstance(value, bool) and value):
args.append(f"--{attr}")
elif isinstance(value, bool) and not value:
continue
elif isinstance(value, list):
args.extend([f"--{attr}={v}" for v in value])
else:
args.append(f"--{attr}={value}")
self.log.info("----------------------------------SOME options: %s", args)
return args

def _build_gcloud_command(self, command: list[str], parameters: dict[str, str]) -> list[str]:
return [*command, *(self.dataflow_options_to_args(parameters))]

def _create_dataflow_cluster_with_gcloud(self, cmd: list[str]) -> str:
"""Create a Dataflow cluster with a gcloud command and return the job's ID."""
self.log.info("Executing command: %s", " ".join(shlex.quote(c) for c in cmd))
success_code = 0

with self.provide_authorized_gcloud():
proc = subprocess.run(cmd, capture_output=True)

if proc.returncode != success_code:
stderr_last_20_lines = "\n".join(proc.stderr.decode().strip().splitlines()[-20:])
raise AirflowException(
f"Process exit with non-zero exit code. Exit code: {proc.returncode}. Error Details : "
f"{stderr_last_20_lines}"
)

response = proc.stdout.decode().strip()
self.log.info("----------SOME RESPONSE AFTER RUNNING COMMAND: %s", response)

return response

def wait_for_operation(
self,
operation: Operation,
Expand Down Expand Up @@ -302,7 +352,7 @@ def create_cluster(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
) -> Operation | str:
"""
Create a cluster in a specified project.
Expand Down Expand Up @@ -339,6 +389,30 @@ def create_cluster(
"project_id": project_id,
"cluster_name": cluster_name,
}
# if virtual_cluster_config is kubernetes_cluster_config, then run gcloud command
if "kubernetes_cluster_config" in virtual_cluster_config:
self.log.info("--------some config from virtual: %s", virtual_cluster_config["kubernetes_cluster_config"])
kube_config = virtual_cluster_config["kubernetes_cluster_config"]["gke_cluster_config"]
gke_cluster_name = kube_config["gke_cluster_target"].rsplit('/', 1)[1]
gke_pools = kube_config["node_pool_target"][0]
gke_pool_name = gke_pools["node_pool"].rsplit('/', 1)[1]
gke_pool_role = gke_pools["roles"][0]
gke_pool_machine_type = gke_pools["node_pool_config"]["config"]["machine_type"]
gcp_flags = {
"region": region,
"gke-cluster": gke_cluster_name,
"spark-engine-version": "3",
"pools": f"name={gke_pool_name},roles={gke_pool_role.lower()},machineType={gke_pool_machine_type},min=1,max=10",
"setup-workload-identity": None,
}
self.log.info("----------------GKE FLAGS: %s", gcp_flags)
cmd = self._build_gcloud_command(
command=["gcloud", "dataproc", "clusters", "gke", "create", cluster_name], parameters=gcp_flags
)
response = self._create_dataflow_cluster_with_gcloud(cmd=cmd)
self.log.info("AGAIN SOME RESPONSE IN CREATE_CLUSTER: %s, i think executed successfully", response)
return response

if virtual_cluster_config is not None:
cluster["virtual_cluster_config"] = virtual_cluster_config # type: ignore
if cluster_config is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,7 @@ def _retry_cluster_creation(self, hook: DataprocHook):
self._wait_for_cluster_in_deleting_state(hook)
self.log.info("Starting a new creation for Cluster %s", self.cluster_name)
operation = self._create_cluster(hook)
self.log.info("coming back from creation 2")
cluster = hook.wait_for_operation(timeout=self.timeout, result_retry=self.retry, operation=operation)
self.log.info("Cluster created.")
return Cluster.to_dict(cluster)
Expand All @@ -822,7 +823,9 @@ def execute(self, context: Context) -> dict:
try:
# First try to create a new cluster
operation = self._create_cluster(hook)
if not self.deferrable:
self.log.info("coming back from creation 1, %s", type(operation))
if not self.deferrable and type(operation) is not str:
self.log.info("inside if")
cluster = hook.wait_for_operation(
timeout=self.timeout, result_retry=self.retry, operation=operation
)
Expand Down

0 comments on commit 1e7dcf9

Please sign in to comment.