From cba3ca70671a1a6dfc2311c7b00659ab0413f746 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Thu, 28 Dec 2023 00:46:12 +0100 Subject: [PATCH] Remove remaining Airflow 2.5 backcompat code from GCS Task Handler (#36443) (#36457) Co-authored-by: Andrey Anshin (cherry picked from commit 75faf1115d990746784e25280c0b326b3b557b86) --- .../google/cloud/log/gcs_task_handler.py | 41 +------------------ .../google/cloud/log/test_gcs_task_handler.py | 9 ++-- 2 files changed, 5 insertions(+), 45 deletions(-) diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index 9921bb8753f03..abc2bc8845b9a 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -26,7 +26,6 @@ # not sure why but mypy complains on missing `storage` but it is clearly there and is importable from google.cloud import storage # type: ignore[attr-defined] -from packaging.version import Version from airflow.configuration import conf from airflow.exceptions import AirflowNotFoundException @@ -48,18 +47,6 @@ logger = logging.getLogger(__name__) -def get_default_delete_local_copy(): - """Load delete_local_logs conf if Airflow version > 2.6 and return False if not. - - TODO: delete this function when min airflow version >= 2.6. - """ - from airflow.version import version - - if Version(version) < Version("2.6"): - return False - return conf.getboolean("logging", "delete_local_logs") - - class GCSTaskHandler(FileTaskHandler, LoggingMixin): """ GCSTaskHandler is a python log handler that handles and reads task instance logs. @@ -108,8 +95,8 @@ def __init__( self.gcp_keyfile_dict = gcp_keyfile_dict self.scopes = gcp_scopes self.project_id = project_id - self.delete_local_copy = ( - kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy() + self.delete_local_copy = kwargs.get( + "delete_local_copy", conf.getboolean("logging", "delete_local_logs") ) @cached_property @@ -218,30 +205,6 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], l messages.append(f"Unable to read remote log {e}") return messages, logs - def _read(self, ti, try_number, metadata=None): - """ - Read logs of given task instance and try_number from GCS. - - If failed, read the log from task instance host machine. - - todo: when min airflow version >= 2.6, remove this method - - :param ti: task instance object - :param try_number: task instance try_number to read logs from - :param metadata: log metadata, - can be used for steaming log reading and auto-tailing. - """ - if hasattr(super(), "_read_remote_logs"): - # from Airflow 2.6, we don't implement the `_read` method. - # if parent has _read_remote_logs, we're >= 2.6 - return super()._read(ti, try_number, metadata) - - messages, logs = self._read_remote_logs(ti, try_number, metadata) - if not logs: - return super()._read(ti, try_number, metadata) - - return "".join([f"*** {x}\n" for x in messages]) + "\n".join(logs), {"end_of_log": True} - def gcs_write(self, log, remote_log_location) -> bool: """ Write the log to the remote location and return `True`; fail silently and return `False` on error. diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py index 2d4dd7340d805..a3e929b985334 100644 --- a/tests/providers/google/cloud/log/test_gcs_task_handler.py +++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py @@ -248,8 +248,8 @@ def test_write_to_remote_on_close_failed_read_old_logs(self, mock_blob, mock_cli ) @pytest.mark.parametrize( - "delete_local_copy, expected_existence_of_local_copy, airflow_version", - [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True, "2.5.0"), (False, True, "2.5.0")], + "delete_local_copy, expected_existence_of_local_copy", + [(True, False), (False, True)], ) @mock.patch( "airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id", @@ -265,12 +265,9 @@ def test_close_with_delete_local_copy_conf( local_log_location, delete_local_copy, expected_existence_of_local_copy, - airflow_version, ): mock_blob.from_string.return_value.download_as_bytes.return_value = b"CONTENT" - with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}), mock.patch( - "airflow.version.version", airflow_version - ): + with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}): handler = GCSTaskHandler( base_log_folder=local_log_location, gcs_log_folder="gs://bucket/remote/log/location",