diff --git a/astronomer/providers/sftp/hooks/sftp.py b/astronomer/providers/sftp/hooks/sftp.py index 93fb4c94a..f229f8a27 100644 --- a/astronomer/providers/sftp/hooks/sftp.py +++ b/astronomer/providers/sftp/hooks/sftp.py @@ -1,6 +1,7 @@ from __future__ import annotations import os.path +import warnings from datetime import datetime from fnmatch import fnmatch from typing import Sequence @@ -14,17 +15,8 @@ class SFTPHookAsync(BaseHook): """ - Interact with an SFTP server via asyncssh package - - :param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server - :param host: hostname of the SFTP server - :param port: port of the SFTP server - :param username: username used when authenticating to the SFTP server - :param password: password used when authenticating to the SFTP server. - Can be left blank if using a key file - :param known_hosts: path to the known_hosts file on the local file system. Defaults to ``~/.ssh/known_hosts``. - :param key_file: path to the client key file used for authentication to SFTP server - :param passphrase: passphrase used with the key_file for authentication to SFTP server + This class is deprecated and will be removed in 2.0.0. + Use :class: `~airflow.providers.sftp.hooks.sftp.SFTPHookAsync` instead. """ conn_name_attr = "ssh_conn_id" @@ -45,6 +37,10 @@ def __init__( # nosec: B107 passphrase: str = "", private_key: str = "", ) -> None: + warnings.warn( + "This class is deprecated and will be removed in 2.0.0. " + "Use `airflow.providers.sftp.hooks.sftp.SFTPHookAsync` instead." + ) self.sftp_conn_id = sftp_conn_id self.host = host self.port = port diff --git a/astronomer/providers/sftp/sensors/sftp.py b/astronomer/providers/sftp/sensors/sftp.py index ccd1e7545..b70ed3cdb 100644 --- a/astronomer/providers/sftp/sensors/sftp.py +++ b/astronomer/providers/sftp/sensors/sftp.py @@ -1,85 +1,24 @@ -from datetime import timedelta -from typing import Any, Dict, Optional +import warnings +from typing import Any -from airflow.configuration import conf from airflow.providers.sftp.sensors.sftp import SFTPSensor -from astronomer.providers.sftp.hooks.sftp import SFTPHookAsync -from astronomer.providers.sftp.triggers.sftp import SFTPTrigger -from astronomer.providers.utils.sensor_util import raise_error_or_skip_exception -from astronomer.providers.utils.typing_compat import Context - class SFTPSensorAsync(SFTPSensor): """ - Polls an SFTP server continuously until a file_pattern is matched at a defined path - - :param path: The path on the SFTP server to search for a file matching the file pattern. - Authentication method used in the SFTP connection must have access to this path - :param file_pattern: Pattern to be used for matching against the list of files at the path above. - Uses the fnmatch module from std library to perform the matching. - :param timeout: How long, in seconds, the sensor waits for successful before timing out - :param newer_than: DateTime for which the file or file path should be newer than, comparison is inclusive + This class is deprecated. + Use :class: `~airflow.providers.sftp.sensors.sftp.SFTPSensor` instead + and set `deferrable` param to `True` instead. """ def __init__( self, - *, - path: str, - file_pattern: str = "", - timeout: Optional[float] = None, + *args: Any, **kwargs: Any, ) -> None: - self.path = path - self.file_pattern = file_pattern - if timeout is None: - timeout = conf.getfloat("sensors", "default_timeout") - super().__init__(path=path, file_pattern=file_pattern, timeout=timeout, **kwargs) - self.hook = SFTPHookAsync(sftp_conn_id=self.sftp_conn_id) # type: ignore[assignment] - - def execute(self, context: Context) -> None: - """ - Logic that the sensor uses to correctly identify which trigger to - execute, and defer execution as expected. - """ - # Unlike other async sensors, we do not follow the pattern of calling the synchronous self.poke() method before - # deferring here. This is due to the current limitations we have in the synchronous SFTPHook methods. - # The limitations are discovered while being worked upon the ticket - # https://github.com/astronomer/astronomer-providers/issues/1021. They are as follows: - # 1. For host key types of ecdsa, the hook expects the host key to prefixed with 'ssh-' as per the mapping of - # key types defined in it to get the appropriate key constructor for the ecdsa type keys, whereas - # conventionally such keys are not prefixed with 'ssh-'. - # 2. The sync sensor does not support the newer_than field to be passed as a Jinja template value which is of - # string type. - # 3. For file_pattern sensing, the hook implements list_directory() method which returns a list of filenames - # only without the attributes like modified time which is required for the file_pattern sensing when - # newer_than is supplied. This leads to intermittent failures potentially due to throttling by the SFTP - # server as the hook makes multiple calls to the server to get the attributes for each of the files in the - # directory.This limitation is resolved here by instead calling the read_directory() method which returns a - # list of files along with their attributes in a single call. - # We can add back the call to self.poke() before deferring once the above limitations are resolved in the - # sync sensor. - self.defer( - timeout=timedelta(seconds=self.timeout), - trigger=SFTPTrigger( - path=self.path, - file_pattern=self.file_pattern, - sftp_conn_id=self.sftp_conn_id, - poke_interval=self.poke_interval, - newer_than=self.newer_than, - ), - method_name="execute_complete", + warnings.warn( + "This class is deprecated. " + "Use `airflow.providers.sftp.sensors.sftp.SFTPSensor` instead " + "and set `deferrable` param to `True` instead." ) - - def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None: - """ - Callback for when the trigger fires - returns immediately. - Relies on trigger to throw an exception, otherwise it assumes execution was - successful. - """ - if event is not None: - if "status" in event and event["status"] == "error": - raise_error_or_skip_exception(self.soft_fail, event["message"]) - if "status" in event and event["status"] == "success": - self.log.info("%s completed successfully.", self.task_id) - self.log.info(event["message"]) + super().__init__(*args, deferrable=True, **kwargs) diff --git a/astronomer/providers/sftp/triggers/sftp.py b/astronomer/providers/sftp/triggers/sftp.py index 247a3a327..d56ab347f 100644 --- a/astronomer/providers/sftp/triggers/sftp.py +++ b/astronomer/providers/sftp/triggers/sftp.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import warnings from datetime import datetime from typing import Any, AsyncIterator @@ -14,16 +15,8 @@ class SFTPTrigger(BaseTrigger): """ - Trigger that fires when either the path on the SFTP server does not exist, - or when there are no files matching the file pattern at the path - - :param path: The path on the SFTP server to search for a file matching the file pattern. - Authentication method used in the SFTP connection must have access to this path - :param file_pattern: Pattern to be used for matching against the list of files at the path above. - Uses the fnmatch module from std library to perform the matching. - - :param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server - :param poke_interval: How often, in seconds, to check for the existence of the file on the SFTP server + This class is deprecated and will be removed in 2.0.0. + Use :class: `~airflow.providers.sftp.triggers.sftp.SFTPTrigger` instead. """ def __init__( @@ -34,6 +27,10 @@ def __init__( newer_than: datetime | str | None = None, poke_interval: float = 5, ) -> None: + warnings.warn( + "This class is deprecated and will be removed in 2.0.0. " + "Use `airflow.providers.sftp.triggers.sftp.SFTPTrigger` instead." + ) super().__init__() self.path = path self.file_pattern = file_pattern diff --git a/setup.cfg b/setup.cfg index 52db933d4..54d0607d4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -69,7 +69,7 @@ http = microsoft.azure = apache-airflow-providers-microsoft-azure>=9.0.0 sftp = - apache-airflow-providers-sftp + apache-airflow-providers-sftp>=4.9.0 asyncssh>=2.12.0 snowflake = apache-airflow-providers-snowflake>=5.3.0 @@ -126,7 +126,7 @@ all = apache-airflow-providers-google>=10.14.0 apache-airflow-providers-http>=4.9.0 apache-airflow-providers-snowflake>=5.3.0 - apache-airflow-providers-sftp + apache-airflow-providers-sftp>=4.9.0 apache-airflow-providers-microsoft-azure>=9.0.0 asyncssh>=2.12.0 databricks-sql-connector>=2.0.4;python_version>='3.10' diff --git a/tests/sftp/sensors/test_sftp.py b/tests/sftp/sensors/test_sftp.py index dc98f8726..52e27a5be 100644 --- a/tests/sftp/sensors/test_sftp.py +++ b/tests/sftp/sensors/test_sftp.py @@ -1,49 +1,15 @@ -from unittest import mock - -import pytest -from airflow.exceptions import AirflowException, TaskDeferred +from airflow.providers.sftp.sensors.sftp import SFTPSensor from astronomer.providers.sftp.sensors.sftp import SFTPSensorAsync -from astronomer.providers.sftp.triggers.sftp import SFTPTrigger - -MODULE = "astronomer.providers.sftp.sensors.sftp" class TestSFTPSensorAsync: - @mock.patch(f"{MODULE}.SFTPSensorAsync.poke", return_value=False) - def test_sftp_run_now_sensor_async(self, context): - """ - Asserts that a task is deferred and a SFTPTrigger will be fired - when the SFTPSensorAsync is executed. - """ - + def test_init(self): task = SFTPSensorAsync( task_id="run_now", path="/test/path/", file_pattern="test_file", ) - with pytest.raises(TaskDeferred) as exc: - task.execute(context) - assert isinstance(exc.value.trigger, SFTPTrigger), "Trigger is not an SFTPTrigger" - - def test_sftp_execute_complete_success(self, context): - """ - Asserts that execute_complete doesn't raise an exception if the - TriggerEvent is marked success - """ - task = SFTPSensorAsync(task_id="run_now", path="/test/path/", file_pattern="test_file") - task.execute_complete(context, {"status": "success", "message": "some_file.txt"}) - - def test_sftp_execute_complete_failure(self, context): - """ - Asserts that execute_complete raises an exception if the - TriggerEvent is marked failure - """ - - task = SFTPSensorAsync(task_id="run_now", path="/test/path/", file_pattern="test_file") - expected_message = "Some exception message" - - with pytest.raises(AirflowException) as exc: - task.execute_complete(context, {"status": "error", "message": expected_message}) - assert exc.message == expected_message + assert isinstance(task, SFTPSensor) + assert task.deferrable is True