Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate async SFTP sensor #1470

Merged
merged 3 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions astronomer/providers/sftp/hooks/sftp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import os.path
import warnings
from datetime import datetime
from fnmatch import fnmatch
from typing import Sequence
Expand All @@ -14,17 +15,8 @@

class SFTPHookAsync(BaseHook):
"""
Interact with an SFTP server via asyncssh package

:param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server
:param host: hostname of the SFTP server
:param port: port of the SFTP server
:param username: username used when authenticating to the SFTP server
:param password: password used when authenticating to the SFTP server.
Can be left blank if using a key file
:param known_hosts: path to the known_hosts file on the local file system. Defaults to ``~/.ssh/known_hosts``.
:param key_file: path to the client key file used for authentication to SFTP server
:param passphrase: passphrase used with the key_file for authentication to SFTP server
This class is deprecated and will be removed in 2.0.0.
Use :class: `~airflow.providers.sftp.hooks.sftp.SFTPHookAsync` instead.
"""

conn_name_attr = "ssh_conn_id"
Expand All @@ -45,6 +37,10 @@ def __init__( # nosec: B107
passphrase: str = "",
private_key: str = "",
) -> None:
warnings.warn(
"This class is deprecated and will be removed in 2.0.0. "
"Use `airflow.providers.sftp.hooks.sftp.SFTPHookAsync` instead."
)
self.sftp_conn_id = sftp_conn_id
self.host = host
self.port = port
Expand Down
83 changes: 11 additions & 72 deletions astronomer/providers/sftp/sensors/sftp.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,24 @@
from datetime import timedelta
from typing import Any, Dict, Optional
import warnings
from typing import Any

from airflow.configuration import conf
from airflow.providers.sftp.sensors.sftp import SFTPSensor

from astronomer.providers.sftp.hooks.sftp import SFTPHookAsync
from astronomer.providers.sftp.triggers.sftp import SFTPTrigger
from astronomer.providers.utils.sensor_util import raise_error_or_skip_exception
from astronomer.providers.utils.typing_compat import Context


class SFTPSensorAsync(SFTPSensor):
"""
Polls an SFTP server continuously until a file_pattern is matched at a defined path

:param path: The path on the SFTP server to search for a file matching the file pattern.
Authentication method used in the SFTP connection must have access to this path
:param file_pattern: Pattern to be used for matching against the list of files at the path above.
Uses the fnmatch module from std library to perform the matching.
:param timeout: How long, in seconds, the sensor waits for successful before timing out
:param newer_than: DateTime for which the file or file path should be newer than, comparison is inclusive
This class is deprecated.
Use :class: `~airflow.providers.sftp.sensors.sftp.SFTPSensor` instead
and set `deferrable` param to `True` instead.
"""

def __init__(
self,
*,
path: str,
file_pattern: str = "",
timeout: Optional[float] = None,
*args: Any,
**kwargs: Any,
) -> None:
self.path = path
self.file_pattern = file_pattern
if timeout is None:
timeout = conf.getfloat("sensors", "default_timeout")
super().__init__(path=path, file_pattern=file_pattern, timeout=timeout, **kwargs)
self.hook = SFTPHookAsync(sftp_conn_id=self.sftp_conn_id) # type: ignore[assignment]

def execute(self, context: Context) -> None:
"""
Logic that the sensor uses to correctly identify which trigger to
execute, and defer execution as expected.
"""
# Unlike other async sensors, we do not follow the pattern of calling the synchronous self.poke() method before
# deferring here. This is due to the current limitations we have in the synchronous SFTPHook methods.
# The limitations are discovered while being worked upon the ticket
# https://github.com/astronomer/astronomer-providers/issues/1021. They are as follows:
# 1. For host key types of ecdsa, the hook expects the host key to prefixed with 'ssh-' as per the mapping of
# key types defined in it to get the appropriate key constructor for the ecdsa type keys, whereas
# conventionally such keys are not prefixed with 'ssh-'.
# 2. The sync sensor does not support the newer_than field to be passed as a Jinja template value which is of
# string type.
# 3. For file_pattern sensing, the hook implements list_directory() method which returns a list of filenames
# only without the attributes like modified time which is required for the file_pattern sensing when
# newer_than is supplied. This leads to intermittent failures potentially due to throttling by the SFTP
# server as the hook makes multiple calls to the server to get the attributes for each of the files in the
# directory.This limitation is resolved here by instead calling the read_directory() method which returns a
# list of files along with their attributes in a single call.
# We can add back the call to self.poke() before deferring once the above limitations are resolved in the
# sync sensor.
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=SFTPTrigger(
path=self.path,
file_pattern=self.file_pattern,
sftp_conn_id=self.sftp_conn_id,
poke_interval=self.poke_interval,
newer_than=self.newer_than,
),
method_name="execute_complete",
warnings.warn(
"This class is deprecated. "
"Use `airflow.providers.sftp.sensors.sftp.SFTPSensor` instead "
"and set `deferrable` param to `True` instead."
)

def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event is not None:
if "status" in event and event["status"] == "error":
raise_error_or_skip_exception(self.soft_fail, event["message"])
if "status" in event and event["status"] == "success":
self.log.info("%s completed successfully.", self.task_id)
self.log.info(event["message"])
super().__init__(*args, deferrable=True, **kwargs)
17 changes: 7 additions & 10 deletions astronomer/providers/sftp/triggers/sftp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import warnings
from datetime import datetime
from typing import Any, AsyncIterator

Expand All @@ -14,16 +15,8 @@

class SFTPTrigger(BaseTrigger):
"""
Trigger that fires when either the path on the SFTP server does not exist,
or when there are no files matching the file pattern at the path

:param path: The path on the SFTP server to search for a file matching the file pattern.
Authentication method used in the SFTP connection must have access to this path
:param file_pattern: Pattern to be used for matching against the list of files at the path above.
Uses the fnmatch module from std library to perform the matching.

:param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server
:param poke_interval: How often, in seconds, to check for the existence of the file on the SFTP server
This class is deprecated and will be removed in 2.0.0.
Use :class: `~airflow.providers.sftp.triggers.sftp.SFTPTrigger` instead.
"""

def __init__(
Expand All @@ -34,6 +27,10 @@ def __init__(
newer_than: datetime | str | None = None,
poke_interval: float = 5,
) -> None:
warnings.warn(
"This class is deprecated and will be removed in 2.0.0. "
"Use `airflow.providers.sftp.triggers.sftp.SFTPTrigger` instead."
)
super().__init__()
self.path = path
self.file_pattern = file_pattern
Expand Down
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ http =
microsoft.azure =
apache-airflow-providers-microsoft-azure>=9.0.0
sftp =
apache-airflow-providers-sftp
apache-airflow-providers-sftp>=4.9.0
asyncssh>=2.12.0
snowflake =
apache-airflow-providers-snowflake>=5.3.0
Expand Down Expand Up @@ -126,7 +126,7 @@ all =
apache-airflow-providers-google>=10.14.0
apache-airflow-providers-http>=4.9.0
apache-airflow-providers-snowflake>=5.3.0
apache-airflow-providers-sftp
apache-airflow-providers-sftp>=4.9.0
apache-airflow-providers-microsoft-azure>=9.0.0
asyncssh>=2.12.0
databricks-sql-connector>=2.0.4;python_version>='3.10'
Expand Down
42 changes: 4 additions & 38 deletions tests/sftp/sensors/test_sftp.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,15 @@
from unittest import mock

import pytest
from airflow.exceptions import AirflowException, TaskDeferred
from airflow.providers.sftp.sensors.sftp import SFTPSensor

from astronomer.providers.sftp.sensors.sftp import SFTPSensorAsync
from astronomer.providers.sftp.triggers.sftp import SFTPTrigger

MODULE = "astronomer.providers.sftp.sensors.sftp"


class TestSFTPSensorAsync:
@mock.patch(f"{MODULE}.SFTPSensorAsync.poke", return_value=False)
def test_sftp_run_now_sensor_async(self, context):
"""
Asserts that a task is deferred and a SFTPTrigger will be fired
when the SFTPSensorAsync is executed.
"""

def test_init(self):
task = SFTPSensorAsync(
task_id="run_now",
path="/test/path/",
file_pattern="test_file",
)

with pytest.raises(TaskDeferred) as exc:
task.execute(context)
assert isinstance(exc.value.trigger, SFTPTrigger), "Trigger is not an SFTPTrigger"

def test_sftp_execute_complete_success(self, context):
"""
Asserts that execute_complete doesn't raise an exception if the
TriggerEvent is marked success
"""
task = SFTPSensorAsync(task_id="run_now", path="/test/path/", file_pattern="test_file")
task.execute_complete(context, {"status": "success", "message": "some_file.txt"})

def test_sftp_execute_complete_failure(self, context):
"""
Asserts that execute_complete raises an exception if the
TriggerEvent is marked failure
"""

task = SFTPSensorAsync(task_id="run_now", path="/test/path/", file_pattern="test_file")
expected_message = "Some exception message"

with pytest.raises(AirflowException) as exc:
task.execute_complete(context, {"status": "error", "message": expected_message})
assert exc.message == expected_message
assert isinstance(task, SFTPSensor)
assert task.deferrable is True
Loading