Skip to content

Commit

Permalink
Added support for job_parameters and dbt_commands in DatabricksRunNow…
Browse files Browse the repository at this point in the history
…Operator (apache#43895)
  • Loading branch information
pranshupand-db authored Nov 12, 2024
1 parent ac4dec1 commit e7b4937
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ class DatabricksRunNowOperator(BaseOperator):
json = {
"job_id": 42,
"notebook_params": {"dry-run": "true", "oldest-time-to-consider": "1457570074236"},
"job_parameters": {"dry-run": "true", "oldest-time-to-consider": "1457570074236"},
}
notebook_run = DatabricksRunNowOperator(task_id="notebook_run", json=json)
Expand All @@ -688,6 +688,8 @@ class DatabricksRunNowOperator(BaseOperator):
job_id = 42
dbt_commands = ["dbt deps", "dbt seed", "dbt run"]
notebook_params = {"dry-run": "true", "oldest-time-to-consider": "1457570074236"}
python_params = ["douglas adams", "42"]
Expand All @@ -698,6 +700,7 @@ class DatabricksRunNowOperator(BaseOperator):
notebook_run = DatabricksRunNowOperator(
job_id=job_id,
dbt_commands=dbt_commands,
notebook_params=notebook_params,
python_params=python_params,
jar_params=jar_params,
Expand All @@ -711,7 +714,9 @@ class DatabricksRunNowOperator(BaseOperator):
Currently the named parameters that ``DatabricksRunNowOperator`` supports are
- ``job_id``
- ``job_name``
- ``job_parameters``
- ``json``
- ``dbt_commands``
- ``notebook_params``
- ``python_params``
- ``python_named_parameters``
Expand All @@ -731,6 +736,17 @@ class DatabricksRunNowOperator(BaseOperator):
It must exist only one job with the specified name.
``job_id`` and ``job_name`` are mutually exclusive.
This field will be templated.
:param job_parameters: A dict from keys to values that override or augment the job's
parameters for this run. Job parameters are passed to any of the job's tasks that
accept key-value parameters. Job parameters supersede ``notebook_params``, ``python_params``,
``python_named_parameters``, ``jar_params``, ``spark_submit_params``, and they cannot be used in
combination.
This field will be templated.
.. seealso::
https://docs.databricks.com/en/workflows/jobs/settings.html#add-parameters-for-all-job-tasks
:param json: A JSON object containing API parameters which will be passed
directly to the ``api/2.1/jobs/run-now`` endpoint. The other named parameters
(i.e. ``notebook_params``, ``spark_submit_params``..) to this operator will
Expand All @@ -741,6 +757,13 @@ class DatabricksRunNowOperator(BaseOperator):
.. seealso::
For more information about templating see :ref:`concepts:jinja-templating`.
https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow
:param dbt_commands: A list containing the dbt commands to run using the dbt command line
interface. This field will be templated.
.. seealso::
https://docs.databricks.com/en/jobs/dbt.html
:param notebook_params: A dict from keys to values for jobs with notebook task,
e.g. "notebook_params": {"name": "john doe", "age": "35"}.
The map is passed to the notebook and will be accessible through the
Expand Down Expand Up @@ -832,7 +855,9 @@ def __init__(
*,
job_id: str | None = None,
job_name: str | None = None,
job_parameters: dict[str, str] | None = None,
json: Any | None = None,
dbt_commands: list[str] | None = None,
notebook_params: dict[str, str] | None = None,
python_params: list[str] | None = None,
jar_params: list[str] | None = None,
Expand Down Expand Up @@ -884,6 +909,10 @@ def __init__(
self.json["spark_submit_params"] = spark_submit_params
if idempotency_token is not None:
self.json["idempotency_token"] = idempotency_token
if job_parameters is not None:
self.json["job_parameters"] = job_parameters
if dbt_commands is not None:
self.json["dbt_commands"] = dbt_commands
if self.json:
self.json = normalise_json_content(self.json)
# This variable will be used in case our task gets killed.
Expand Down
5 changes: 5 additions & 0 deletions providers/tests/databricks/operators/test_databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
JOB_ID = "42"
JOB_NAME = "job-name"
JOB_DESCRIPTION = "job-description"
DBT_COMMANDS = ["dbt deps", "dbt seed", "dbt run"]
NOTEBOOK_PARAMS = {"dry-run": "true", "oldest-time-to-consider": "1457570074236"}
JAR_PARAMS = ["param1", "param2"]
RENDERED_TEMPLATED_JAR_PARAMS = [f"/test-{DATE}"]
Expand Down Expand Up @@ -1179,6 +1180,7 @@ def test_init_with_json(self):
Test the initializer with json data.
"""
json = {
"dbt_commands": DBT_COMMANDS,
"notebook_params": NOTEBOOK_PARAMS,
"jar_params": JAR_PARAMS,
"python_params": PYTHON_PARAMS,
Expand All @@ -1190,6 +1192,7 @@ def test_init_with_json(self):

expected = utils.normalise_json_content(
{
"dbt_commands": DBT_COMMANDS,
"notebook_params": NOTEBOOK_PARAMS,
"jar_params": JAR_PARAMS,
"python_params": PYTHON_PARAMS,
Expand All @@ -1215,6 +1218,7 @@ def test_init_with_merging(self):
task_id=TASK_ID,
json=json,
job_id=JOB_ID,
dbt_commands=DBT_COMMANDS,
notebook_params=override_notebook_params,
python_params=PYTHON_PARAMS,
jar_params=override_jar_params,
Expand All @@ -1223,6 +1227,7 @@ def test_init_with_merging(self):

expected = utils.normalise_json_content(
{
"dbt_commands": DBT_COMMANDS,
"notebook_params": override_notebook_params,
"jar_params": override_jar_params,
"python_params": PYTHON_PARAMS,
Expand Down

0 comments on commit e7b4937

Please sign in to comment.