-
Notifications
You must be signed in to change notification settings - Fork 26
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Deprecate SFTPSensorAsync and proxy it to its Airflow OSS provider's counterpart closes: #1419
- Loading branch information
1 parent
17078e1
commit 8b691f1
Showing
5 changed files
with
31 additions
and
133 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,85 +1,24 @@ | ||
from datetime import timedelta | ||
from typing import Any, Dict, Optional | ||
import warnings | ||
from typing import Any | ||
|
||
from airflow.configuration import conf | ||
from airflow.providers.sftp.sensors.sftp import SFTPSensor | ||
|
||
from astronomer.providers.sftp.hooks.sftp import SFTPHookAsync | ||
from astronomer.providers.sftp.triggers.sftp import SFTPTrigger | ||
from astronomer.providers.utils.sensor_util import raise_error_or_skip_exception | ||
from astronomer.providers.utils.typing_compat import Context | ||
|
||
|
||
class SFTPSensorAsync(SFTPSensor): | ||
""" | ||
Polls an SFTP server continuously until a file_pattern is matched at a defined path | ||
:param path: The path on the SFTP server to search for a file matching the file pattern. | ||
Authentication method used in the SFTP connection must have access to this path | ||
:param file_pattern: Pattern to be used for matching against the list of files at the path above. | ||
Uses the fnmatch module from std library to perform the matching. | ||
:param timeout: How long, in seconds, the sensor waits for successful before timing out | ||
:param newer_than: DateTime for which the file or file path should be newer than, comparison is inclusive | ||
This class is deprecated. | ||
Use :class: `~airflow.providers.sftp.sensors.sftp.SFTPSensor` instead | ||
and set `deferrable` param to `True` instead. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
*, | ||
path: str, | ||
file_pattern: str = "", | ||
timeout: Optional[float] = None, | ||
*args: Any, | ||
**kwargs: Any, | ||
) -> None: | ||
self.path = path | ||
self.file_pattern = file_pattern | ||
if timeout is None: | ||
timeout = conf.getfloat("sensors", "default_timeout") | ||
super().__init__(path=path, file_pattern=file_pattern, timeout=timeout, **kwargs) | ||
self.hook = SFTPHookAsync(sftp_conn_id=self.sftp_conn_id) # type: ignore[assignment] | ||
|
||
def execute(self, context: Context) -> None: | ||
""" | ||
Logic that the sensor uses to correctly identify which trigger to | ||
execute, and defer execution as expected. | ||
""" | ||
# Unlike other async sensors, we do not follow the pattern of calling the synchronous self.poke() method before | ||
# deferring here. This is due to the current limitations we have in the synchronous SFTPHook methods. | ||
# The limitations are discovered while being worked upon the ticket | ||
# https://github.com/astronomer/astronomer-providers/issues/1021. They are as follows: | ||
# 1. For host key types of ecdsa, the hook expects the host key to prefixed with 'ssh-' as per the mapping of | ||
# key types defined in it to get the appropriate key constructor for the ecdsa type keys, whereas | ||
# conventionally such keys are not prefixed with 'ssh-'. | ||
# 2. The sync sensor does not support the newer_than field to be passed as a Jinja template value which is of | ||
# string type. | ||
# 3. For file_pattern sensing, the hook implements list_directory() method which returns a list of filenames | ||
# only without the attributes like modified time which is required for the file_pattern sensing when | ||
# newer_than is supplied. This leads to intermittent failures potentially due to throttling by the SFTP | ||
# server as the hook makes multiple calls to the server to get the attributes for each of the files in the | ||
# directory.This limitation is resolved here by instead calling the read_directory() method which returns a | ||
# list of files along with their attributes in a single call. | ||
# We can add back the call to self.poke() before deferring once the above limitations are resolved in the | ||
# sync sensor. | ||
self.defer( | ||
timeout=timedelta(seconds=self.timeout), | ||
trigger=SFTPTrigger( | ||
path=self.path, | ||
file_pattern=self.file_pattern, | ||
sftp_conn_id=self.sftp_conn_id, | ||
poke_interval=self.poke_interval, | ||
newer_than=self.newer_than, | ||
), | ||
method_name="execute_complete", | ||
warnings.warn( | ||
"This class is deprecated. " | ||
"Use `airflow.providers.sftp.sensors.sftp.SFTPSensor` instead " | ||
"and set `deferrable` param to `True` instead." | ||
) | ||
|
||
def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None: | ||
""" | ||
Callback for when the trigger fires - returns immediately. | ||
Relies on trigger to throw an exception, otherwise it assumes execution was | ||
successful. | ||
""" | ||
if event is not None: | ||
if "status" in event and event["status"] == "error": | ||
raise_error_or_skip_exception(self.soft_fail, event["message"]) | ||
if "status" in event and event["status"] == "success": | ||
self.log.info("%s completed successfully.", self.task_id) | ||
self.log.info(event["message"]) | ||
super().__init__(*args, deferrable=True, **kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,49 +1,15 @@ | ||
from unittest import mock | ||
|
||
import pytest | ||
from airflow.exceptions import AirflowException, TaskDeferred | ||
from airflow.providers.sftp.sensors.sftp import SFTPSensor | ||
|
||
from astronomer.providers.sftp.sensors.sftp import SFTPSensorAsync | ||
from astronomer.providers.sftp.triggers.sftp import SFTPTrigger | ||
|
||
MODULE = "astronomer.providers.sftp.sensors.sftp" | ||
|
||
|
||
class TestSFTPSensorAsync: | ||
@mock.patch(f"{MODULE}.SFTPSensorAsync.poke", return_value=False) | ||
def test_sftp_run_now_sensor_async(self, context): | ||
""" | ||
Asserts that a task is deferred and a SFTPTrigger will be fired | ||
when the SFTPSensorAsync is executed. | ||
""" | ||
|
||
def test_init(self): | ||
task = SFTPSensorAsync( | ||
task_id="run_now", | ||
path="/test/path/", | ||
file_pattern="test_file", | ||
) | ||
|
||
with pytest.raises(TaskDeferred) as exc: | ||
task.execute(context) | ||
assert isinstance(exc.value.trigger, SFTPTrigger), "Trigger is not an SFTPTrigger" | ||
|
||
def test_sftp_execute_complete_success(self, context): | ||
""" | ||
Asserts that execute_complete doesn't raise an exception if the | ||
TriggerEvent is marked success | ||
""" | ||
task = SFTPSensorAsync(task_id="run_now", path="/test/path/", file_pattern="test_file") | ||
task.execute_complete(context, {"status": "success", "message": "some_file.txt"}) | ||
|
||
def test_sftp_execute_complete_failure(self, context): | ||
""" | ||
Asserts that execute_complete raises an exception if the | ||
TriggerEvent is marked failure | ||
""" | ||
|
||
task = SFTPSensorAsync(task_id="run_now", path="/test/path/", file_pattern="test_file") | ||
expected_message = "Some exception message" | ||
|
||
with pytest.raises(AirflowException) as exc: | ||
task.execute_complete(context, {"status": "error", "message": expected_message}) | ||
assert exc.message == expected_message | ||
assert isinstance(task, SFTPSensor) | ||
assert task.deferrable is True |