Skip to content

Commit

Permalink
chore: remove deprecated bigquery facets from OpenLineage utils (apac…
Browse files Browse the repository at this point in the history
…he#44838)

Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Dec 11, 2024
1 parent 8480460 commit 7f1d54a
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 70 deletions.

This file was deleted.

14 changes: 3 additions & 11 deletions providers/src/airflow/providers/google/cloud/openlineage/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,7 @@ def get_openlineage_facets_on_complete(self, _):

def get_facets(self, job_id: str):
from airflow.providers.common.compat.openlineage.facet import ErrorMessageRunFacet
from airflow.providers.google.cloud.openlineage.utils import (
BigQueryErrorRunFacet,
get_from_nullable_chain,
)
from airflow.providers.google.cloud.openlineage.utils import get_from_nullable_chain

inputs = []
outputs = []
Expand All @@ -125,8 +122,7 @@ def get_facets(self, job_id: str):
if get_from_nullable_chain(props, ["status", "state"]) != "DONE":
raise ValueError(f"Trying to extract data from running bigquery job: `{job_id}`")

# TODO: remove bigQuery_job in next release
run_facets["bigQuery_job"] = run_facets["bigQueryJob"] = self._get_bigquery_job_run_facet(props)
run_facets["bigQueryJob"] = self._get_bigquery_job_run_facet(props)

if get_from_nullable_chain(props, ["statistics", "numChildJobs"]):
if hasattr(self, "log"):
Expand All @@ -145,16 +141,12 @@ def get_facets(self, job_id: str):
if hasattr(self, "log"):
self.log.warning("Cannot retrieve job details from BigQuery.Client. %s", e, exc_info=True)
exception_msg = traceback.format_exc()
# TODO: remove BigQueryErrorRunFacet in next release
run_facets.update(
{
"errorMessage": ErrorMessageRunFacet(
message=f"{e}: {exception_msg}",
programmingLanguage="python",
),
"bigQuery_error": BigQueryErrorRunFacet(
clientError=f"{e}: {exception_msg}",
),
)
}
)
deduplicated_outputs = self._deduplicate_outputs(outputs)
Expand Down
22 changes: 0 additions & 22 deletions providers/src/airflow/providers/google/cloud/openlineage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,28 +218,6 @@ def _get_schema() -> str:
)


# TODO: remove BigQueryErrorRunFacet in next release
@define
class BigQueryErrorRunFacet(RunFacet):
"""
Represents errors that can happen during execution of BigqueryExtractor.
:param clientError: represents errors originating in bigquery client
:param parserError: represents errors that happened during parsing SQL provided to bigquery
"""

clientError: str | None = field(default=None)
parserError: str | None = field(default=None)

@staticmethod
def _get_schema() -> str:
return (
"https://raw.githubusercontent.com/apache/airflow/"
f"providers-google/{provider_version}/airflow/providers/google/"
"openlineage/BigQueryErrorRunFacet.json"
)


def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
"""
Get object from nested structure of objects, where it's not guaranteed that all keys in the nested structure exist.
Expand Down
6 changes: 0 additions & 6 deletions providers/tests/google/cloud/openlineage/test_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ def test_bq_job_information(self):

self.job_details["configuration"]["query"].pop("query")
assert lineage.run_facets == {
"bigQuery_job": BigQueryJobRunFacet(
cached=False, billedBytes=111149056, properties=json.dumps(self.job_details)
),
"bigQueryJob": BigQueryJobRunFacet(
cached=False, billedBytes=111149056, properties=json.dumps(self.job_details)
),
Expand Down Expand Up @@ -136,9 +133,6 @@ def test_bq_script_job_information(self):
"bigQueryJob": BigQueryJobRunFacet(
cached=False, billedBytes=120586240, properties=json.dumps(self.script_job_details)
),
"bigQuery_job": BigQueryJobRunFacet(
cached=False, billedBytes=120586240, properties=json.dumps(self.script_job_details)
),
"externalQuery": ExternalQueryRunFacet(externalQueryId="job_id", source="bigquery"),
}
assert lineage.inputs == [
Expand Down
1 change: 0 additions & 1 deletion providers/tests/google/cloud/operators/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,6 @@ def test_execute_openlineage_events(self, mock_hook):
]

assert lineage.run_facets == {
"bigQuery_job": mock.ANY,
"bigQueryJob": mock.ANY,
"externalQuery": ExternalQueryRunFacet(externalQueryId=mock.ANY, source="bigquery"),
}
Expand Down

0 comments on commit 7f1d54a

Please sign in to comment.