diff --git a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 3a801093ca0bd..865b30233db3e 100644 --- a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -from datetime import date, datetime, timedelta +from datetime import date, datetime, timedelta, timezone from functools import cached_property from typing import TYPE_CHECKING, Any @@ -163,7 +163,7 @@ def get_cloudwatch_logs(self, stream_name: str, task_instance: TaskInstance) -> return "\n".join(self._event_to_str(event) for event in events) def _event_to_str(self, event: dict) -> str: - event_dt = datetime.utcfromtimestamp(event["timestamp"] / 1000.0) + event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc) formatted_event_dt = event_dt.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3] message = event["message"] return f"[{formatted_event_dt}] {message}" diff --git a/airflow/providers/amazon/aws/utils/task_log_fetcher.py b/airflow/providers/amazon/aws/utils/task_log_fetcher.py index 63110edadad96..a4cad6c099c2f 100644 --- a/airflow/providers/amazon/aws/utils/task_log_fetcher.py +++ b/airflow/providers/amazon/aws/utils/task_log_fetcher.py @@ -18,7 +18,7 @@ from __future__ import annotations import time -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from threading import Event, Thread from typing import TYPE_CHECKING, Generator @@ -87,7 +87,7 @@ def _get_log_events(self, skip_token: AwsLogsHook.ContinuationToken | None = Non @staticmethod def event_to_str(event: dict) -> str: - event_dt = datetime.utcfromtimestamp(event["timestamp"] / 1000.0) + event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc) formatted_event_dt = event_dt.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3] message = event["message"] return f"[{formatted_event_dt}] {message}" diff --git a/airflow/providers/google/cloud/transfers/s3_to_gcs.py b/airflow/providers/google/cloud/transfers/s3_to_gcs.py index cc745059a1065..92dc97b4cc670 100644 --- a/airflow/providers/google/cloud/transfers/s3_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/s3_to_gcs.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -from datetime import datetime +from datetime import datetime, timezone from tempfile import NamedTemporaryFile from typing import TYPE_CHECKING, Any, Sequence @@ -276,7 +276,7 @@ def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3H ) def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> list[str]: - now = datetime.utcnow() + now = datetime.now(tz=timezone.utc) one_time_schedule = {"day": now.day, "month": now.month, "year": now.year} gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs) diff --git a/airflow/providers/google/common/hooks/base_google.py b/airflow/providers/google/common/hooks/base_google.py index a83524422a82c..bee2798267610 100644 --- a/airflow/providers/google/common/hooks/base_google.py +++ b/airflow/providers/google/common/hooks/base_google.py @@ -674,7 +674,7 @@ async def acquire_access_token(self, timeout: int = 10) -> None: self.access_token = cast(str, self.credentials.token) self.access_token_duration = 3600 - self.access_token_acquired_at = datetime.datetime.utcnow() + self.access_token_acquired_at = datetime.datetime.now(tz=datetime.timezone.utc) self.acquiring = None diff --git a/kubernetes_tests/test_base.py b/kubernetes_tests/test_base.py index 715bbbb237a3b..17d1c9954301a 100644 --- a/kubernetes_tests/test_base.py +++ b/kubernetes_tests/test_base.py @@ -20,7 +20,7 @@ import subprocess import tempfile import time -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from subprocess import check_call, check_output @@ -77,7 +77,7 @@ def _describe_resources(self, namespace: str): with open(output_file_path, "w") as output_file: print("=" * 80, file=output_file) print(f"Describe resources for namespace {namespace}", file=output_file) - print(f"Datetime: {datetime.utcnow()}", file=output_file) + print(f"Datetime: {datetime.now(tz=timezone.utc)}", file=output_file) print("=" * 80, file=output_file) print("Describing pods", file=output_file) print("-" * 80, file=output_file) diff --git a/tests/providers/amazon/aws/hooks/test_sagemaker.py b/tests/providers/amazon/aws/hooks/test_sagemaker.py index 7c3549c9b7bf5..5cdf6438caf28 100644 --- a/tests/providers/amazon/aws/hooks/test_sagemaker.py +++ b/tests/providers/amazon/aws/hooks/test_sagemaker.py @@ -18,7 +18,7 @@ from __future__ import annotations import time -from datetime import datetime +from datetime import datetime, timezone from unittest import mock from unittest.mock import patch @@ -528,7 +528,7 @@ def test_secondary_training_status_changed_false(self): def test_secondary_training_status_message_status_changed(self): now = datetime.now(tzlocal()) SECONDARY_STATUS_DESCRIPTION_1["LastModifiedTime"] = now - expected_time = datetime.utcfromtimestamp(time.mktime(now.timetuple())).strftime("%Y-%m-%d %H:%M:%S") + expected_time = now.astimezone(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S") expected = f"{expected_time} {status} - {message}" assert ( secondary_training_status_message(SECONDARY_STATUS_DESCRIPTION_1, SECONDARY_STATUS_DESCRIPTION_2) diff --git a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py index 7ceb42348191b..4f9ecb1664339 100644 --- a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py @@ -19,7 +19,7 @@ import logging import time -from datetime import datetime as dt, timedelta +from datetime import datetime as dt, timedelta, timezone from unittest import mock from unittest.mock import ANY, Mock, call @@ -40,7 +40,7 @@ def get_time_str(time_in_milliseconds): - dt_time = dt.utcfromtimestamp(time_in_milliseconds / 1000.0) + dt_time = dt.fromtimestamp(time_in_milliseconds / 1000.0, tz=timezone.utc) return dt_time.strftime("%Y-%m-%d %H:%M:%S,000") diff --git a/tests/providers/celery/executors/test_celery_executor.py b/tests/providers/celery/executors/test_celery_executor.py index 450312bd63198..a32d710048dbb 100644 --- a/tests/providers/celery/executors/test_celery_executor.py +++ b/tests/providers/celery/executors/test_celery_executor.py @@ -22,7 +22,7 @@ import os import signal import sys -from datetime import datetime, timedelta +from datetime import timedelta from unittest import mock # leave this it is used by the test worker @@ -183,7 +183,7 @@ def test_command_validation(self, command, raise_exception): @pytest.mark.backend("mysql", "postgres") def test_try_adopt_task_instances_none(self): - start_date = datetime.utcnow() - timedelta(days=2) + start_date = timezone.utcnow() - timedelta(days=2) with DAG("test_try_adopt_task_instances_none"): task_1 = BaseOperator(task_id="task_1", start_date=start_date) diff --git a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py index d74f4636c84c9..d00342fd98fc1 100644 --- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -573,7 +573,7 @@ def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job try: assert executor.event_buffer == {} executor.execute_async( - key=("dag", "task", datetime.utcnow(), 1), + key=("dag", "task", timezone.utcnow(), 1), queue=None, command=["airflow", "tasks", "run", "true", "some_parameter"], executor_config=k8s.V1Pod( @@ -1540,7 +1540,7 @@ def test_process_status_pending(self): def test_process_status_pending_deleted(self): self.events.append({"type": "DELETED", "object": self.pod}) - self.pod.metadata.deletion_timestamp = datetime.utcnow() + self.pod.metadata.deletion_timestamp = timezone.utcnow() self._run() self.assert_watcher_queue_called_once_with_state(State.FAILED) @@ -1570,7 +1570,7 @@ def test_process_status_succeeded_dedup_label(self): def test_process_status_succeeded_dedup_timestamp(self): self.pod.status.phase = "Succeeded" - self.pod.metadata.deletion_timestamp = datetime.utcnow() + self.pod.metadata.deletion_timestamp = timezone.utcnow() self.events.append({"type": "MODIFIED", "object": self.pod}) self._run() @@ -1604,7 +1604,7 @@ def test_process_status_pod_adopted(self, ti_state): def test_process_status_running_deleted(self): self.pod.status.phase = "Running" - self.pod.metadata.deletion_timestamp = datetime.utcnow() + self.pod.metadata.deletion_timestamp = timezone.utcnow() self.events.append({"type": "DELETED", "object": self.pod}) self._run() diff --git a/tests/providers/google/cloud/sensors/test_gcs.py b/tests/providers/google/cloud/sensors/test_gcs.py index 37697ff58d735..641b2052fdd66 100644 --- a/tests/providers/google/cloud/sensors/test_gcs.py +++ b/tests/providers/google/cloud/sensors/test_gcs.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from unittest import mock import pendulum @@ -45,6 +45,7 @@ GCSPrefixBlobTrigger, GCSUploadSessionTrigger, ) +from airflow.utils import timezone TEST_BUCKET = "TEST_BUCKET" @@ -67,15 +68,6 @@ TEST_MIN_OBJECTS = 1 -@pytest.fixture() -def context(): - """ - Creates an empty context. - """ - context = {"data_interval_end": datetime.utcnow()} - yield context - - def next_time_side_effect(): """ This each time this is called mock a time 10 seconds later @@ -159,7 +151,7 @@ def test_gcs_object_existence_sensor_deferred(self, mock_hook): ) mock_hook.return_value.exists.return_value = False with pytest.raises(TaskDeferred) as exc: - task.execute(context) + task.execute({}) assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not a GCSBlobTrigger" @pytest.mark.parametrize( @@ -213,7 +205,7 @@ def test_gcs_object_existence_async_sensor(self, mock_hook): ) mock_hook.return_value.exists.return_value = False with pytest.raises(TaskDeferred) as exc: - task.execute(context) + task.execute({}) assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not a GCSBlobTrigger" @pytest.mark.parametrize( @@ -329,20 +321,20 @@ def test_gcs_object_update_async_sensor(self, mock_hook): @pytest.mark.parametrize( "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) ) - def test_gcs_object_update_async_sensor_execute_failure(self, context, soft_fail, expected_exception): + def test_gcs_object_update_async_sensor_execute_failure(self, soft_fail, expected_exception): """Tests that an AirflowException is raised in case of error event""" self.OPERATOR.soft_fail = soft_fail with pytest.raises(expected_exception): self.OPERATOR.execute_complete( - context=context, event={"status": "error", "message": "test failure message"} + context={}, event={"status": "error", "message": "test failure message"} ) - def test_gcs_object_update_async_sensor_execute_complete(self, context): + def test_gcs_object_update_async_sensor_execute_complete(self): """Asserts that logging occurs as expected""" with mock.patch.object(self.OPERATOR.log, "info") as mock_log_info: self.OPERATOR.execute_complete( - context=context, event={"status": "success", "message": "Job completed"} + context={}, event={"status": "success", "message": "Job completed"} ) mock_log_info.assert_called_with( "Checking last updated time for object %s in bucket : %s", TEST_OBJECT, TEST_BUCKET @@ -462,21 +454,21 @@ def test_gcs_object_with_prefix_existence_async_sensor(self, mock_hook): "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) ) def test_gcs_object_with_prefix_existence_async_sensor_execute_failure( - self, context, soft_fail, expected_exception + self, soft_fail, expected_exception ): """Tests that an AirflowException is raised in case of error event""" self.OPERATOR.soft_fail = soft_fail with pytest.raises(expected_exception): self.OPERATOR.execute_complete( - context=context, event={"status": "error", "message": "test failure message"} + context={}, event={"status": "error", "message": "test failure message"} ) - def test_gcs_object_with_prefix_existence_async_sensor_execute_complete(self, context): + def test_gcs_object_with_prefix_existence_async_sensor_execute_complete(self): """Asserts that logging occurs as expected""" with mock.patch.object(self.OPERATOR.log, "info") as mock_log_info: self.OPERATOR.execute_complete( - context=context, + context={}, event={"status": "success", "message": "Job completed", "matches": [TEST_OBJECT]}, ) mock_log_info.assert_called_with("Resuming from trigger and checking status") @@ -609,18 +601,16 @@ def test_gcs_upload_session_complete_async_sensor(self, mock_hook): @pytest.mark.parametrize( "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException)) ) - def test_gcs_upload_session_complete_sensor_execute_failure(self, context, soft_fail, expected_exception): + def test_gcs_upload_session_complete_sensor_execute_failure(self, soft_fail, expected_exception): """Tests that an AirflowException is raised in case of error event""" self.OPERATOR.soft_fail = soft_fail with pytest.raises(expected_exception): self.OPERATOR.execute_complete( - context=context, event={"status": "error", "message": "test failure message"} + context={}, event={"status": "error", "message": "test failure message"} ) - def test_gcs_upload_session_complete_async_sensor_execute_complete(self, context): + def test_gcs_upload_session_complete_async_sensor_execute_complete(self): """Asserts that execute complete is completed as expected""" - assert self.OPERATOR.execute_complete( - context=context, event={"status": "success", "message": "success"} - ) + assert self.OPERATOR.execute_complete(context={}, event={"status": "success", "message": "success"}) diff --git a/tests/providers/google/cloud/triggers/test_gcs.py b/tests/providers/google/cloud/triggers/test_gcs.py index 3c4bc9031a8de..97a5597d978d5 100644 --- a/tests/providers/google/cloud/triggers/test_gcs.py +++ b/tests/providers/google/cloud/triggers/test_gcs.py @@ -18,7 +18,7 @@ from __future__ import annotations import asyncio -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Any from unittest import mock from unittest.mock import AsyncMock @@ -41,7 +41,7 @@ TEST_GCP_CONN_ID = "TEST_GCP_CONN_ID" TEST_POLLING_INTERVAL = 3.0 TEST_HOOK_PARAMS: dict[str, Any] = {} -TEST_TS_OBJECT = datetime.utcnow() +TEST_TS_OBJECT = datetime.now(tz=timezone.utc) TEST_INACTIVITY_PERIOD = 5.0 diff --git a/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py index dee9ce7282400..ba64050997414 100644 --- a/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py +++ b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py @@ -22,7 +22,7 @@ import os from copy import deepcopy -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from airflow.models.dag import DAG from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator @@ -86,7 +86,7 @@ SCHEDULE: { SCHEDULE_START_DATE: datetime(2015, 1, 1).date(), SCHEDULE_END_DATE: datetime(2030, 1, 1).date(), - START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=1)).time(), + START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(minutes=1)).time(), }, TRANSFER_SPEC: { AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS}, diff --git a/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py index 67159b8747ea6..b173abfffa380 100644 --- a/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py +++ b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py @@ -23,7 +23,7 @@ from __future__ import annotations import os -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from pathlib import Path from airflow.models.dag import DAG @@ -83,7 +83,7 @@ SCHEDULE: { SCHEDULE_START_DATE: datetime(2015, 1, 1).date(), SCHEDULE_END_DATE: datetime(2030, 1, 1).date(), - START_TIME_OF_DAY: (datetime.utcnow() + timedelta(seconds=120)).time(), + START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(seconds=120)).time(), }, TRANSFER_SPEC: { GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},