Skip to content

Commit

Permalink
Fix MetastoreHivePartitionSensor failing due to duplicate aliases (ap…
Browse files Browse the repository at this point in the history
…ache#45001)

* fix MetastoreHivePartitionSensor with updated query to avoid duplicate aliases

* Update TestMetastoreHivePartitionSensor

* Fix MetastoreHivePartitionSensor pytests
  • Loading branch information
CYarros10 authored Dec 18, 2024
1 parent 0ff35c7 commit 553ddf3
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,16 @@ def get_dataproc_metastore_client_v1beta(self):

def wait_for_operation(self, timeout: float | None, operation: Operation):
"""Wait for long-lasting operation to complete."""
self.log.info("Waiting for operation (timeout: %s seconds)", timeout)

try:
return operation.result(timeout=timeout)
except Exception:
result = operation.result(timeout=timeout)
self.log.info("Operation completed successfully")
return result
except Exception as e:
self.log.error("Operation failed: %s", str(e))
error = operation.exception(timeout=timeout)
raise AirflowException(error)
raise AirflowException(f"Operation failed: {error}")

@GoogleBaseHook.fallback_to_default_project_id
def create_backup(
Expand Down Expand Up @@ -669,23 +674,37 @@ def list_hive_partitions(
# because dictionaries are ordered since Python 3.7+
_partitions = list(dict.fromkeys(partition_names)) if partition_names else []

query = f"""
SELECT *
FROM PARTITIONS
INNER JOIN TBLS
ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE
TBLS.TBL_NAME = '{table}'"""
if _partitions:
query += f"""
AND PARTITIONS.PART_NAME IN ({', '.join(f"'{p}'" for p in _partitions)})"""
query += ";"

client = self.get_dataproc_metastore_client_v1beta()
result = client.query_metadata(
request={
"service": f"projects/{project_id}/locations/{region}/services/{service_id}",
"query": query,
}
)
return result
partition_list = ", ".join(f"'{p}'" for p in _partitions)
query = f"""
SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
FROM PARTITIONS
INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE TBLS.TBL_NAME = '{table}'
AND PARTITIONS.PART_NAME IN ({partition_list});"""
else:
query = f"""
SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
FROM PARTITIONS
INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE TBLS.TBL_NAME = '{table}';"""

request = {
"service": f"projects/{project_id}/locations/{region}/services/{service_id}",
"query": query,
}

self.log.info("Prepared request:")
self.log.info(request)

# Execute query
try:
self.log.info("Getting Dataproc Metastore client (v1beta)...")
client = self.get_dataproc_metastore_client_v1beta()
self.log.info("Executing query_metadata...")
result = client.query_metadata(request=request)
self.log.info("Query executed successfully")
return result
except Exception as e:
self.log.error("Error executing query_metadata: %s", str(e))
raise
23 changes: 10 additions & 13 deletions providers/tests/google/cloud/hooks/test_dataproc_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,17 @@
TEST_PARTITION_NAME = "column=value"
TEST_SUBPARTITION_NAME = "column1=value1/column2=value2"
TEST_PARTITIONS_QUERY_ALL = """
SELECT *
FROM PARTITIONS
INNER JOIN TBLS
ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE
TBLS.TBL_NAME = '{}';"""
SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
FROM PARTITIONS
INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE TBLS.TBL_NAME = '{}';"""

TEST_PARTITIONS_QUERY = """
SELECT *
FROM PARTITIONS
INNER JOIN TBLS
ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE
TBLS.TBL_NAME = '{}'
AND PARTITIONS.PART_NAME IN ({});"""
SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME
FROM PARTITIONS
INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID
WHERE TBLS.TBL_NAME = '{}'
AND PARTITIONS.PART_NAME IN ({});"""
BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
DATAPROC_METASTORE_STRING = "airflow.providers.google.cloud.hooks.dataproc_metastore.{}"

Expand Down

0 comments on commit 553ddf3

Please sign in to comment.