From 8d6a0a1be403a5a839d3c10dec593cfa9465e15e Mon Sep 17 00:00:00 2001 From: vatsrahul1001 <43964496+vatsrahul1001@users.noreply.github.com> Date: Mon, 26 Aug 2024 17:26:54 +0530 Subject: [PATCH] Update Databricks workflow example DAG (#41700) * update databricks workflow example dag * Format commentted code * Update tests/system/providers/databricks/example_databricks_workflow.py * Rearrange comment block * removing comment --------- Co-authored-by: Pankaj Koti --- .../databricks/example_databricks_workflow.py | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/system/providers/databricks/example_databricks_workflow.py b/tests/system/providers/databricks/example_databricks_workflow.py index e94e775a3c0a4..6dfec572510f3 100644 --- a/tests/system/providers/databricks/example_databricks_workflow.py +++ b/tests/system/providers/databricks/example_databricks_workflow.py @@ -32,37 +32,38 @@ EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) -DATABRICKS_CONN_ID = os.getenv("DATABRICKS_CONN_ID", "databricks_conn") +DATABRICKS_CONN_ID = os.getenv("DATABRICKS_CONN_ID", "databricks_default") DATABRICKS_NOTIFICATION_EMAIL = os.getenv("DATABRICKS_NOTIFICATION_EMAIL", "your_email@serviceprovider.com") GROUP_ID = os.getenv("DATABRICKS_GROUP_ID", "1234").replace(".", "_") USER = os.environ.get("USER") -QUERY_ID = os.environ.get("QUERY_ID", "c9cf6468-babe-41a6-abc3-10ac358c71ee") -WAREHOUSE_ID = os.environ.get("WAREHOUSE_ID", "cf414a2206dfb397") +QUERY_ID = os.environ.get("QUERY_ID", "d3773b5a-56f9-422c-ae60-048eaa90aa33") +WAREHOUSE_ID = os.environ.get("WAREHOUSE_ID", "368fe30b92228713") +# job_cluster_spec example for Databricks on Azure job_cluster_spec = [ { "job_cluster_key": "Shared_job_cluster", "new_cluster": { "cluster_name": "", "spark_version": "11.3.x-scala2.12", - "aws_attributes": { - "first_on_demand": 1, - "availability": "SPOT_WITH_FALLBACK", - "zone_id": "us-east-2b", - "spot_bid_price_percent": 100, - "ebs_volume_count": 0, + "azure_attributes": { + "availability": "ON_DEMAND_AZURE", + "spot_bid_max_price": -1, }, - "node_type_id": "i3.xlarge", + "num_workers": 1, + "spark_conf": {}, + "node_type_id": "Standard_D3_v2", + "ssh_public_keys": [], + "custom_tags": {}, "spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"}, - "enable_elastic_disk": False, - "data_security_mode": "LEGACY_SINGLE_USER_STANDARD", - "runtime_engine": "STANDARD", - "num_workers": 8, + "cluster_source": "JOB", + "init_scripts": [], }, } ] + dag = DAG( dag_id="example_databricks_workflow", start_date=datetime(2022, 1, 1), @@ -122,14 +123,13 @@ }, "libraries": [ {"pypi": {"package": "Faker"}}, - {"pypi": {"package": "simplejson"}}, ], }, ) sql_query = DatabricksTaskOperator( task_id="sql_query", - databricks_conn_id="databricks_conn", + databricks_conn_id=DATABRICKS_CONN_ID, task_config={ "sql_task": { "query": {