diff --git a/providers/src/airflow/providers/google/cloud/hooks/dataproc_metastore.py b/providers/src/airflow/providers/google/cloud/hooks/dataproc_metastore.py index b0168ddbfbba8..3520397984812 100644 --- a/providers/src/airflow/providers/google/cloud/hooks/dataproc_metastore.py +++ b/providers/src/airflow/providers/google/cloud/hooks/dataproc_metastore.py @@ -60,11 +60,16 @@ def get_dataproc_metastore_client_v1beta(self): def wait_for_operation(self, timeout: float | None, operation: Operation): """Wait for long-lasting operation to complete.""" + self.log.info("Waiting for operation (timeout: %s seconds)", timeout) + try: - return operation.result(timeout=timeout) - except Exception: + result = operation.result(timeout=timeout) + self.log.info("Operation completed successfully") + return result + except Exception as e: + self.log.error("Operation failed: %s", str(e)) error = operation.exception(timeout=timeout) - raise AirflowException(error) + raise AirflowException(f"Operation failed: {error}") @GoogleBaseHook.fallback_to_default_project_id def create_backup( @@ -669,23 +674,37 @@ def list_hive_partitions( # because dictionaries are ordered since Python 3.7+ _partitions = list(dict.fromkeys(partition_names)) if partition_names else [] - query = f""" - SELECT * - FROM PARTITIONS - INNER JOIN TBLS - ON PARTITIONS.TBL_ID = TBLS.TBL_ID - WHERE - TBLS.TBL_NAME = '{table}'""" if _partitions: - query += f""" - AND PARTITIONS.PART_NAME IN ({', '.join(f"'{p}'" for p in _partitions)})""" - query += ";" - - client = self.get_dataproc_metastore_client_v1beta() - result = client.query_metadata( - request={ - "service": f"projects/{project_id}/locations/{region}/services/{service_id}", - "query": query, - } - ) - return result + partition_list = ", ".join(f"'{p}'" for p in _partitions) + query = f""" + SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME + FROM PARTITIONS + INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID + WHERE TBLS.TBL_NAME = '{table}' + AND PARTITIONS.PART_NAME IN ({partition_list});""" + else: + query = f""" + SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME + FROM PARTITIONS + INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID + WHERE TBLS.TBL_NAME = '{table}';""" + + request = { + "service": f"projects/{project_id}/locations/{region}/services/{service_id}", + "query": query, + } + + self.log.info("Prepared request:") + self.log.info(request) + + # Execute query + try: + self.log.info("Getting Dataproc Metastore client (v1beta)...") + client = self.get_dataproc_metastore_client_v1beta() + self.log.info("Executing query_metadata...") + result = client.query_metadata(request=request) + self.log.info("Query executed successfully") + return result + except Exception as e: + self.log.error("Error executing query_metadata: %s", str(e)) + raise diff --git a/providers/tests/google/cloud/hooks/test_dataproc_metastore.py b/providers/tests/google/cloud/hooks/test_dataproc_metastore.py index 600693fa0f3a3..d31af293d47d1 100644 --- a/providers/tests/google/cloud/hooks/test_dataproc_metastore.py +++ b/providers/tests/google/cloud/hooks/test_dataproc_metastore.py @@ -60,20 +60,17 @@ TEST_PARTITION_NAME = "column=value" TEST_SUBPARTITION_NAME = "column1=value1/column2=value2" TEST_PARTITIONS_QUERY_ALL = """ - SELECT * - FROM PARTITIONS - INNER JOIN TBLS - ON PARTITIONS.TBL_ID = TBLS.TBL_ID - WHERE - TBLS.TBL_NAME = '{}';""" + SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME + FROM PARTITIONS + INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID + WHERE TBLS.TBL_NAME = '{}';""" + TEST_PARTITIONS_QUERY = """ - SELECT * - FROM PARTITIONS - INNER JOIN TBLS - ON PARTITIONS.TBL_ID = TBLS.TBL_ID - WHERE - TBLS.TBL_NAME = '{}' - AND PARTITIONS.PART_NAME IN ({});""" + SELECT PARTITIONS.*, TBLS.TBL_TYPE, TBLS.TBL_NAME + FROM PARTITIONS + INNER JOIN TBLS ON PARTITIONS.TBL_ID = TBLS.TBL_ID + WHERE TBLS.TBL_NAME = '{}' + AND PARTITIONS.PART_NAME IN ({});""" BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}" DATAPROC_METASTORE_STRING = "airflow.providers.google.cloud.hooks.dataproc_metastore.{}"