Skip to content

Commit

Permalink
Replace usage of datetime.utcnow and datetime.utcfromtimestamp in…
Browse files Browse the repository at this point in the history
… providers (apache#37138)

* Replace usage of `datetime.utcnow` and `datetime.utcfromtimestamp` in providers

* Apply suggested changes

Co-authored-by: Tzu-ping Chung <[email protected]>

* Remove redundand context

Co-authored-by: Tzu-ping Chung <[email protected]>

---------

Co-authored-by: Tzu-ping Chung <[email protected]>
  • Loading branch information
Taragolis and uranusjr authored Feb 15, 2024
1 parent 3c1d051 commit 42f8d04
Show file tree
Hide file tree
Showing 13 changed files with 41 additions and 51 deletions.
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations

from datetime import date, datetime, timedelta
from datetime import date, datetime, timedelta, timezone
from functools import cached_property
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -163,7 +163,7 @@ def get_cloudwatch_logs(self, stream_name: str, task_instance: TaskInstance) ->
return "\n".join(self._event_to_str(event) for event in events)

def _event_to_str(self, event: dict) -> str:
event_dt = datetime.utcfromtimestamp(event["timestamp"] / 1000.0)
event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc)
formatted_event_dt = event_dt.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
message = event["message"]
return f"[{formatted_event_dt}] {message}"
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/utils/task_log_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from __future__ import annotations

import time
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from threading import Event, Thread
from typing import TYPE_CHECKING, Generator

Expand Down Expand Up @@ -87,7 +87,7 @@ def _get_log_events(self, skip_token: AwsLogsHook.ContinuationToken | None = Non

@staticmethod
def event_to_str(event: dict) -> str:
event_dt = datetime.utcfromtimestamp(event["timestamp"] / 1000.0)
event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc)
formatted_event_dt = event_dt.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
message = event["message"]
return f"[{formatted_event_dt}] {message}"
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/transfers/s3_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations

from datetime import datetime
from datetime import datetime, timezone
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Any, Sequence

Expand Down Expand Up @@ -276,7 +276,7 @@ def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3H
)

def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> list[str]:
now = datetime.utcnow()
now = datetime.now(tz=timezone.utc)
one_time_schedule = {"day": now.day, "month": now.month, "year": now.year}

gcs_bucket, gcs_prefix = _parse_gcs_url(self.dest_gcs)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/common/hooks/base_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ async def acquire_access_token(self, timeout: int = 10) -> None:

self.access_token = cast(str, self.credentials.token)
self.access_token_duration = 3600
self.access_token_acquired_at = datetime.datetime.utcnow()
self.access_token_acquired_at = datetime.datetime.now(tz=datetime.timezone.utc)
self.acquiring = None


Expand Down
4 changes: 2 additions & 2 deletions kubernetes_tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import subprocess
import tempfile
import time
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from subprocess import check_call, check_output

Expand Down Expand Up @@ -77,7 +77,7 @@ def _describe_resources(self, namespace: str):
with open(output_file_path, "w") as output_file:
print("=" * 80, file=output_file)
print(f"Describe resources for namespace {namespace}", file=output_file)
print(f"Datetime: {datetime.utcnow()}", file=output_file)
print(f"Datetime: {datetime.now(tz=timezone.utc)}", file=output_file)
print("=" * 80, file=output_file)
print("Describing pods", file=output_file)
print("-" * 80, file=output_file)
Expand Down
4 changes: 2 additions & 2 deletions tests/providers/amazon/aws/hooks/test_sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from __future__ import annotations

import time
from datetime import datetime
from datetime import datetime, timezone
from unittest import mock
from unittest.mock import patch

Expand Down Expand Up @@ -528,7 +528,7 @@ def test_secondary_training_status_changed_false(self):
def test_secondary_training_status_message_status_changed(self):
now = datetime.now(tzlocal())
SECONDARY_STATUS_DESCRIPTION_1["LastModifiedTime"] = now
expected_time = datetime.utcfromtimestamp(time.mktime(now.timetuple())).strftime("%Y-%m-%d %H:%M:%S")
expected_time = now.astimezone(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
expected = f"{expected_time} {status} - {message}"
assert (
secondary_training_status_message(SECONDARY_STATUS_DESCRIPTION_1, SECONDARY_STATUS_DESCRIPTION_2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import logging
import time
from datetime import datetime as dt, timedelta
from datetime import datetime as dt, timedelta, timezone
from unittest import mock
from unittest.mock import ANY, Mock, call

Expand All @@ -40,7 +40,7 @@


def get_time_str(time_in_milliseconds):
dt_time = dt.utcfromtimestamp(time_in_milliseconds / 1000.0)
dt_time = dt.fromtimestamp(time_in_milliseconds / 1000.0, tz=timezone.utc)
return dt_time.strftime("%Y-%m-%d %H:%M:%S,000")


Expand Down
4 changes: 2 additions & 2 deletions tests/providers/celery/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import os
import signal
import sys
from datetime import datetime, timedelta
from datetime import timedelta
from unittest import mock

# leave this it is used by the test worker
Expand Down Expand Up @@ -183,7 +183,7 @@ def test_command_validation(self, command, raise_exception):

@pytest.mark.backend("mysql", "postgres")
def test_try_adopt_task_instances_none(self):
start_date = datetime.utcnow() - timedelta(days=2)
start_date = timezone.utcnow() - timedelta(days=2)

with DAG("test_try_adopt_task_instances_none"):
task_1 = BaseOperator(task_id="task_1", start_date=start_date)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job
try:
assert executor.event_buffer == {}
executor.execute_async(
key=("dag", "task", datetime.utcnow(), 1),
key=("dag", "task", timezone.utcnow(), 1),
queue=None,
command=["airflow", "tasks", "run", "true", "some_parameter"],
executor_config=k8s.V1Pod(
Expand Down Expand Up @@ -1540,7 +1540,7 @@ def test_process_status_pending(self):

def test_process_status_pending_deleted(self):
self.events.append({"type": "DELETED", "object": self.pod})
self.pod.metadata.deletion_timestamp = datetime.utcnow()
self.pod.metadata.deletion_timestamp = timezone.utcnow()

self._run()
self.assert_watcher_queue_called_once_with_state(State.FAILED)
Expand Down Expand Up @@ -1570,7 +1570,7 @@ def test_process_status_succeeded_dedup_label(self):

def test_process_status_succeeded_dedup_timestamp(self):
self.pod.status.phase = "Succeeded"
self.pod.metadata.deletion_timestamp = datetime.utcnow()
self.pod.metadata.deletion_timestamp = timezone.utcnow()
self.events.append({"type": "MODIFIED", "object": self.pod})

self._run()
Expand Down Expand Up @@ -1604,7 +1604,7 @@ def test_process_status_pod_adopted(self, ti_state):

def test_process_status_running_deleted(self):
self.pod.status.phase = "Running"
self.pod.metadata.deletion_timestamp = datetime.utcnow()
self.pod.metadata.deletion_timestamp = timezone.utcnow()
self.events.append({"type": "DELETED", "object": self.pod})

self._run()
Expand Down
42 changes: 16 additions & 26 deletions tests/providers/google/cloud/sensors/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations

from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta
from unittest import mock

import pendulum
Expand Down Expand Up @@ -45,6 +45,7 @@
GCSPrefixBlobTrigger,
GCSUploadSessionTrigger,
)
from airflow.utils import timezone

TEST_BUCKET = "TEST_BUCKET"

Expand All @@ -67,15 +68,6 @@
TEST_MIN_OBJECTS = 1


@pytest.fixture()
def context():
"""
Creates an empty context.
"""
context = {"data_interval_end": datetime.utcnow()}
yield context


def next_time_side_effect():
"""
This each time this is called mock a time 10 seconds later
Expand Down Expand Up @@ -159,7 +151,7 @@ def test_gcs_object_existence_sensor_deferred(self, mock_hook):
)
mock_hook.return_value.exists.return_value = False
with pytest.raises(TaskDeferred) as exc:
task.execute(context)
task.execute({})
assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not a GCSBlobTrigger"

@pytest.mark.parametrize(
Expand Down Expand Up @@ -213,7 +205,7 @@ def test_gcs_object_existence_async_sensor(self, mock_hook):
)
mock_hook.return_value.exists.return_value = False
with pytest.raises(TaskDeferred) as exc:
task.execute(context)
task.execute({})
assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not a GCSBlobTrigger"

@pytest.mark.parametrize(
Expand Down Expand Up @@ -329,20 +321,20 @@ def test_gcs_object_update_async_sensor(self, mock_hook):
@pytest.mark.parametrize(
"soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException))
)
def test_gcs_object_update_async_sensor_execute_failure(self, context, soft_fail, expected_exception):
def test_gcs_object_update_async_sensor_execute_failure(self, soft_fail, expected_exception):
"""Tests that an AirflowException is raised in case of error event"""
self.OPERATOR.soft_fail = soft_fail
with pytest.raises(expected_exception):
self.OPERATOR.execute_complete(
context=context, event={"status": "error", "message": "test failure message"}
context={}, event={"status": "error", "message": "test failure message"}
)

def test_gcs_object_update_async_sensor_execute_complete(self, context):
def test_gcs_object_update_async_sensor_execute_complete(self):
"""Asserts that logging occurs as expected"""

with mock.patch.object(self.OPERATOR.log, "info") as mock_log_info:
self.OPERATOR.execute_complete(
context=context, event={"status": "success", "message": "Job completed"}
context={}, event={"status": "success", "message": "Job completed"}
)
mock_log_info.assert_called_with(
"Checking last updated time for object %s in bucket : %s", TEST_OBJECT, TEST_BUCKET
Expand Down Expand Up @@ -462,21 +454,21 @@ def test_gcs_object_with_prefix_existence_async_sensor(self, mock_hook):
"soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException))
)
def test_gcs_object_with_prefix_existence_async_sensor_execute_failure(
self, context, soft_fail, expected_exception
self, soft_fail, expected_exception
):
"""Tests that an AirflowException is raised in case of error event"""
self.OPERATOR.soft_fail = soft_fail
with pytest.raises(expected_exception):
self.OPERATOR.execute_complete(
context=context, event={"status": "error", "message": "test failure message"}
context={}, event={"status": "error", "message": "test failure message"}
)

def test_gcs_object_with_prefix_existence_async_sensor_execute_complete(self, context):
def test_gcs_object_with_prefix_existence_async_sensor_execute_complete(self):
"""Asserts that logging occurs as expected"""

with mock.patch.object(self.OPERATOR.log, "info") as mock_log_info:
self.OPERATOR.execute_complete(
context=context,
context={},
event={"status": "success", "message": "Job completed", "matches": [TEST_OBJECT]},
)
mock_log_info.assert_called_with("Resuming from trigger and checking status")
Expand Down Expand Up @@ -609,18 +601,16 @@ def test_gcs_upload_session_complete_async_sensor(self, mock_hook):
@pytest.mark.parametrize(
"soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException))
)
def test_gcs_upload_session_complete_sensor_execute_failure(self, context, soft_fail, expected_exception):
def test_gcs_upload_session_complete_sensor_execute_failure(self, soft_fail, expected_exception):
"""Tests that an AirflowException is raised in case of error event"""

self.OPERATOR.soft_fail = soft_fail
with pytest.raises(expected_exception):
self.OPERATOR.execute_complete(
context=context, event={"status": "error", "message": "test failure message"}
context={}, event={"status": "error", "message": "test failure message"}
)

def test_gcs_upload_session_complete_async_sensor_execute_complete(self, context):
def test_gcs_upload_session_complete_async_sensor_execute_complete(self):
"""Asserts that execute complete is completed as expected"""

assert self.OPERATOR.execute_complete(
context=context, event={"status": "success", "message": "success"}
)
assert self.OPERATOR.execute_complete(context={}, event={"status": "success", "message": "success"})
4 changes: 2 additions & 2 deletions tests/providers/google/cloud/triggers/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from __future__ import annotations

import asyncio
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Any
from unittest import mock
from unittest.mock import AsyncMock
Expand All @@ -41,7 +41,7 @@
TEST_GCP_CONN_ID = "TEST_GCP_CONN_ID"
TEST_POLLING_INTERVAL = 3.0
TEST_HOOK_PARAMS: dict[str, Any] = {}
TEST_TS_OBJECT = datetime.utcnow()
TEST_TS_OBJECT = datetime.now(tz=timezone.utc)


TEST_INACTIVITY_PERIOD = 5.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import os
from copy import deepcopy
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

from airflow.models.dag import DAG
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
Expand Down Expand Up @@ -86,7 +86,7 @@
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=1)).time(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(minutes=1)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from __future__ import annotations

import os
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from pathlib import Path

from airflow.models.dag import DAG
Expand Down Expand Up @@ -83,7 +83,7 @@
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.utcnow() + timedelta(seconds=120)).time(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(seconds=120)).time(),
},
TRANSFER_SPEC: {
GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},
Expand Down

0 comments on commit 42f8d04

Please sign in to comment.