From e7b493712d4356a40a56f0fd76c9ade600328746 Mon Sep 17 00:00:00 2001 From: Pranshu Pandya Date: Tue, 12 Nov 2024 13:30:46 +0100 Subject: [PATCH] Added support for job_parameters and dbt_commands in DatabricksRunNowOperator (#43895) --- .../databricks/operators/databricks.py | 31 ++++++++++++++++++- .../databricks/operators/test_databricks.py | 5 +++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/providers/src/airflow/providers/databricks/operators/databricks.py b/providers/src/airflow/providers/databricks/operators/databricks.py index 58ffaeaece582..03da528468d90 100644 --- a/providers/src/airflow/providers/databricks/operators/databricks.py +++ b/providers/src/airflow/providers/databricks/operators/databricks.py @@ -676,7 +676,7 @@ class DatabricksRunNowOperator(BaseOperator): json = { "job_id": 42, - "notebook_params": {"dry-run": "true", "oldest-time-to-consider": "1457570074236"}, + "job_parameters": {"dry-run": "true", "oldest-time-to-consider": "1457570074236"}, } notebook_run = DatabricksRunNowOperator(task_id="notebook_run", json=json) @@ -688,6 +688,8 @@ class DatabricksRunNowOperator(BaseOperator): job_id = 42 + dbt_commands = ["dbt deps", "dbt seed", "dbt run"] + notebook_params = {"dry-run": "true", "oldest-time-to-consider": "1457570074236"} python_params = ["douglas adams", "42"] @@ -698,6 +700,7 @@ class DatabricksRunNowOperator(BaseOperator): notebook_run = DatabricksRunNowOperator( job_id=job_id, + dbt_commands=dbt_commands, notebook_params=notebook_params, python_params=python_params, jar_params=jar_params, @@ -711,7 +714,9 @@ class DatabricksRunNowOperator(BaseOperator): Currently the named parameters that ``DatabricksRunNowOperator`` supports are - ``job_id`` - ``job_name`` + - ``job_parameters`` - ``json`` + - ``dbt_commands`` - ``notebook_params`` - ``python_params`` - ``python_named_parameters`` @@ -731,6 +736,17 @@ class DatabricksRunNowOperator(BaseOperator): It must exist only one job with the specified name. ``job_id`` and ``job_name`` are mutually exclusive. This field will be templated. + + :param job_parameters: A dict from keys to values that override or augment the job's + parameters for this run. Job parameters are passed to any of the job's tasks that + accept key-value parameters. Job parameters supersede ``notebook_params``, ``python_params``, + ``python_named_parameters``, ``jar_params``, ``spark_submit_params``, and they cannot be used in + combination. + This field will be templated. + + .. seealso:: + https://docs.databricks.com/en/workflows/jobs/settings.html#add-parameters-for-all-job-tasks + :param json: A JSON object containing API parameters which will be passed directly to the ``api/2.1/jobs/run-now`` endpoint. The other named parameters (i.e. ``notebook_params``, ``spark_submit_params``..) to this operator will @@ -741,6 +757,13 @@ class DatabricksRunNowOperator(BaseOperator): .. seealso:: For more information about templating see :ref:`concepts:jinja-templating`. https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow + + :param dbt_commands: A list containing the dbt commands to run using the dbt command line + interface. This field will be templated. + + .. seealso:: + https://docs.databricks.com/en/jobs/dbt.html + :param notebook_params: A dict from keys to values for jobs with notebook task, e.g. "notebook_params": {"name": "john doe", "age": "35"}. The map is passed to the notebook and will be accessible through the @@ -832,7 +855,9 @@ def __init__( *, job_id: str | None = None, job_name: str | None = None, + job_parameters: dict[str, str] | None = None, json: Any | None = None, + dbt_commands: list[str] | None = None, notebook_params: dict[str, str] | None = None, python_params: list[str] | None = None, jar_params: list[str] | None = None, @@ -884,6 +909,10 @@ def __init__( self.json["spark_submit_params"] = spark_submit_params if idempotency_token is not None: self.json["idempotency_token"] = idempotency_token + if job_parameters is not None: + self.json["job_parameters"] = job_parameters + if dbt_commands is not None: + self.json["dbt_commands"] = dbt_commands if self.json: self.json = normalise_json_content(self.json) # This variable will be used in case our task gets killed. diff --git a/providers/tests/databricks/operators/test_databricks.py b/providers/tests/databricks/operators/test_databricks.py index d0058eb2c7d8b..1cbe447667709 100644 --- a/providers/tests/databricks/operators/test_databricks.py +++ b/providers/tests/databricks/operators/test_databricks.py @@ -66,6 +66,7 @@ JOB_ID = "42" JOB_NAME = "job-name" JOB_DESCRIPTION = "job-description" +DBT_COMMANDS = ["dbt deps", "dbt seed", "dbt run"] NOTEBOOK_PARAMS = {"dry-run": "true", "oldest-time-to-consider": "1457570074236"} JAR_PARAMS = ["param1", "param2"] RENDERED_TEMPLATED_JAR_PARAMS = [f"/test-{DATE}"] @@ -1179,6 +1180,7 @@ def test_init_with_json(self): Test the initializer with json data. """ json = { + "dbt_commands": DBT_COMMANDS, "notebook_params": NOTEBOOK_PARAMS, "jar_params": JAR_PARAMS, "python_params": PYTHON_PARAMS, @@ -1190,6 +1192,7 @@ def test_init_with_json(self): expected = utils.normalise_json_content( { + "dbt_commands": DBT_COMMANDS, "notebook_params": NOTEBOOK_PARAMS, "jar_params": JAR_PARAMS, "python_params": PYTHON_PARAMS, @@ -1215,6 +1218,7 @@ def test_init_with_merging(self): task_id=TASK_ID, json=json, job_id=JOB_ID, + dbt_commands=DBT_COMMANDS, notebook_params=override_notebook_params, python_params=PYTHON_PARAMS, jar_params=override_jar_params, @@ -1223,6 +1227,7 @@ def test_init_with_merging(self): expected = utils.normalise_json_content( { + "dbt_commands": DBT_COMMANDS, "notebook_params": override_notebook_params, "jar_params": override_jar_params, "python_params": PYTHON_PARAMS,