diff --git a/providers/src/airflow/providers/google/cloud/sensors/gcs.py b/providers/src/airflow/providers/google/cloud/sensors/gcs.py index 0d01f663fa37f..2b5782c77ad42 100644 --- a/providers/src/airflow/providers/google/cloud/sensors/gcs.py +++ b/providers/src/airflow/providers/google/cloud/sensors/gcs.py @@ -178,21 +178,10 @@ def ts_function(context): """ Act as a default callback for the GoogleCloudStorageObjectUpdatedSensor. - The default behaviour is check for the object being updated after the data interval's end, - or execution_date + interval on Airflow versions prior to 2.2 (before AIP-39 implementation). + The default behaviour is check for the object being updated after the data + interval's end. """ - try: - return context["data_interval_end"] - except KeyError: - from airflow.utils import timezone - - data_interval = context["dag"].infer_automated_data_interval( - timezone.coerce_datetime(context["execution_date"]) - ) - next_info = context["dag"].next_dagrun_info(data_interval, restricted=False) - if next_info is None: - return None - return next_info.data_interval.start + return context["data_interval_end"] class GCSObjectUpdateSensor(BaseSensorOperator): diff --git a/providers/tests/google/cloud/sensors/test_gcs.py b/providers/tests/google/cloud/sensors/test_gcs.py index a00cecc264ab5..8e9a5d3bba85b 100644 --- a/providers/tests/google/cloud/sensors/test_gcs.py +++ b/providers/tests/google/cloud/sensors/test_gcs.py @@ -20,7 +20,6 @@ from datetime import datetime, timedelta from unittest import mock -import pendulum import pytest from google.cloud.storage.retry import DEFAULT_RETRY @@ -35,7 +34,6 @@ GCSObjectsWithPrefixExistenceSensor, GCSObjectUpdateSensor, GCSUploadSessionCompleteSensor, - ts_function, ) from airflow.providers.google.cloud.triggers.gcs import ( GCSBlobTrigger, @@ -43,7 +41,6 @@ GCSPrefixBlobTrigger, GCSUploadSessionTrigger, ) -from airflow.utils import timezone TEST_BUCKET = "TEST_BUCKET" @@ -253,28 +250,6 @@ def test_gcs_object_existence_async_sensor_execute_complete(self): mock_log_info.assert_called_with("File %s was found in bucket %s.", TEST_OBJECT, TEST_BUCKET) -class TestTsFunction: - def test_should_support_datetime(self): - context = { - "dag": DAG( - dag_id=TEST_DAG_ID, schedule=timedelta(days=5), start_date=datetime(2019, 2, 14, 0, 0) - ), - "execution_date": datetime(2019, 2, 14, 0, 0), - } - result = ts_function(context) - assert datetime(2019, 2, 19, 0, 0, tzinfo=timezone.utc) == result - - def test_should_support_cron(self): - dag = DAG(dag_id=TEST_DAG_ID, start_date=datetime(2019, 2, 19, 0, 0), schedule="@weekly") - - context = { - "dag": dag, - "execution_date": datetime(2019, 2, 19), - } - result = ts_function(context) - assert pendulum.instance(datetime(2019, 2, 24)).isoformat() == result.isoformat() - - class TestGoogleCloudStorageObjectUpdatedSensor: @mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook") def test_should_pass_argument_to_hook(self, mock_hook):