Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Fivetran connector to v1.1.2 #1530

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 67 additions & 42 deletions dags/operators/backport/fivetran/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,27 @@ class FivetranHook(BaseHook):
:type retry_delay: float
"""

conn_name_attr = 'fivetran_conn_id'
default_conn_name = 'fivetran_default'
conn_type = 'fivetran'
hook_name = 'Fivetran'
api_user_agent = 'airflow_provider_fivetran/1.0.1'
api_protocol = 'https'
api_host = 'api.fivetran.com'
api_path_connectors = 'v1/connectors/'
conn_name_attr = "fivetran_conn_id"
default_conn_name = "fivetran_default"
conn_type = "fivetran"
hook_name = "Fivetran"
api_user_agent = "airflow_provider_fivetran/1.1.2"
api_protocol = "https"
api_host = "api.fivetran.com"
api_path_connectors = "v1/connectors/"

@staticmethod
def get_ui_field_behaviour() -> Dict:
"""Returns custom field behaviour"""
return {
"hidden_fields": ['schema', 'port', 'extra', 'host'],
"hidden_fields": ["schema", "port", "extra", "host"],
"relabeling": {
'login': 'Fivetran API Key',
'password': 'Fivetran API Secret',
"login": "Fivetran API Key",
"password": "Fivetran API Secret",
},
"placeholders": {
'login': 'api key',
'password': 'api secret',
"login": "api key",
"password": "api secret",
},
}

Expand All @@ -61,7 +61,7 @@ def __init__(
retry_limit: int = 3,
retry_delay: float = 1.0,
) -> None:
super().__init__(None) # Passing None fixes a runtime problem in Airflow 1
super().__init__(None) # Passing None fixes a runtime problem in Airflow 1
self.conn_id = fivetran_conn_id
self.fivetran_conn = fivetran_conn
self.timeout_seconds = timeout_seconds
Expand All @@ -88,9 +88,7 @@ def _do_api_call(self, endpoint_info, json=None):
auth = (self.fivetran_conn.login, self.fivetran_conn.password)
url = f"{self.api_protocol}://{self.api_host}/{endpoint}"

headers = {
"User-Agent": self.api_user_agent
}
headers = {"User-Agent": self.api_user_agent}

if method == "GET":
request_func = requests.get
Expand All @@ -110,7 +108,7 @@ def _do_api_call(self, endpoint_info, json=None):
data=json if method in ("POST", "PATCH") else None,
params=json if method in ("GET") else None,
auth=auth,
headers=headers
headers=headers,
)
response.raise_for_status()
return response.json()
Expand Down Expand Up @@ -159,6 +157,7 @@ def get_connector(self, connector_id):
:param connector_id: Fivetran connector_id, found in connector settings
page in the Fivetran user interface.
:type connector_id: str
:return: connector details
:rtype: Dict
"""
if connector_id == "":
Expand All @@ -167,7 +166,6 @@ def get_connector(self, connector_id):
resp = self._do_api_call(("GET", endpoint))
return resp["data"]


def check_connector(self, connector_id):
"""
Ensures connector configuration has been completed successfully and is in
Expand Down Expand Up @@ -195,54 +193,81 @@ def check_connector(self, connector_id):
)
return True

def set_manual_schedule(self, connector_id):
def set_schedule_type(self, connector_id, schedule_type):
"""
Set connector to manual sync mode, required to force sync through the API.
Syncs will no longer be performed automatically and must be started
via the API.
Set connector sync mode to switch sync control between API and UI.
:param connector_id: Fivetran connector_id, found in connector settings
page in the Fivetran user interface.
:type connector_id: str
:param schedule_type: "manual" (schedule controlled via Airlow) or "auto" (schedule controlled via Fivetran)
:type schedule_type: str
"""
endpoint = self.api_path_connectors + connector_id
return self._do_api_call(
("PATCH", endpoint),
json.dumps({"schedule_type": "manual"})
("PATCH", endpoint), json.dumps({"schedule_type": schedule_type})
)

def prep_connector(self, connector_id):
def prep_connector(self, connector_id, schedule_type):
"""
Prepare the connector to run in Airflow by checking that it exists and is a good state, then taking it off of Fivetran's schedule to be managed by Airflow's.
Prepare the connector to run in Airflow by checking that it exists and is a good state,
then update connector sync schedule type if changed.
:param connector_id: Fivetran connector_id, found in connector settings
page in the Fivetran user interface.
:type connector_id: str
:param schedule_type: Fivetran connector schedule type
:type schedule_type: str
"""
self.check_connector(connector_id)
if self.get_connector(connector_id)['schedule_type'] != 'manual':
return self.set_manual_schedule(connector_id)
if schedule_type not in {"manual", "auto"}:
raise ValueError('schedule_type must be either "manual" or "auto"')
if self.get_connector(connector_id)["schedule_type"] != schedule_type:
return self.set_schedule_type(connector_id, schedule_type)
return True

def start_fivetran_sync(self, connector_id):
"""
:param connector_id: Fivetran connector_id, found in connector settings
page in the Fivetran user interface.
:type connector_id: str
:return: Timestamp of previously completed sync
:rtype: str
"""
endpoint = self.api_path_connectors + connector_id + "/force"
return self._do_api_call(("POST", endpoint))
connector_details = self.get_connector(connector_id)
succeeded_at = connector_details["succeeded_at"]
failed_at = connector_details["failed_at"]
endpoint = self.api_path_connectors + connector_id
if self._do_api_call(("GET", endpoint))['data']['paused'] == True:
self._do_api_call(("PATCH", endpoint),json.dumps({"paused": False}))
if succeeded_at == None and failed_at == None:
succeeded_at = str(pendulum.now())
self._do_api_call(("POST", endpoint + "/force"))
last_sync = (
succeeded_at
if self._parse_timestamp(succeeded_at) > self._parse_timestamp(failed_at)
else failed_at
)
return last_sync

def get_last_sync(self, connector_id):
def get_last_sync(self, connector_id, xcom=""):
"""
Get the last time Fivetran connector completed a sync.
Used with FivetranSensor to monitor sync completion status.
:param connector_id: Fivetran connector_id, found in connector settings
page in the Fivetran user interface.
:type connector_id: str
:param xcom: Timestamp as string pull from FivetranOperator via XCOM
:type xcom: str
:return: Timestamp of last completed sync
:rtype: Pendulum.DateTime
"""
connector_details = self.get_connector(connector_id)
succeeded_at = self._parse_timestamp(connector_details["succeeded_at"])
failed_at = self._parse_timestamp(connector_details["failed_at"])
return succeeded_at if succeeded_at > failed_at else failed_at
if xcom:
last_sync = self._parse_timestamp(xcom)
else:
connector_details = self.get_connector(connector_id)
succeeded_at = self._parse_timestamp(connector_details["succeeded_at"])
failed_at = self._parse_timestamp(connector_details["failed_at"])
last_sync = succeeded_at if succeeded_at > failed_at else failed_at
return last_sync

def get_sync_status(self, connector_id, previous_completed_at):
"""
Expand All @@ -259,9 +284,7 @@ def get_sync_status(self, connector_id, previous_completed_at):
connector_details = self.get_connector(connector_id)
succeeded_at = self._parse_timestamp(connector_details["succeeded_at"])
failed_at = self._parse_timestamp(connector_details["failed_at"])
current_completed_at = (
succeeded_at if succeeded_at > failed_at else failed_at
)
current_completed_at = succeeded_at if succeeded_at > failed_at else failed_at

# The only way to tell if a sync failed is to check if its latest
# failed_at value is greater than then last known "sync completed at" value.
Expand All @@ -280,8 +303,10 @@ def get_sync_status(self, connector_id, previous_completed_at):
# Check if sync started by FivetranOperator has finished
# indicated by new 'succeeded_at' timestamp
if current_completed_at > previous_completed_at:
self.log.info('Connector "{}": succeeded_at: {}'.format(
connector_id, succeeded_at.to_iso8601_string())
self.log.info(
'Connector "{}": succeeded_at: {}'.format(
connector_id, succeeded_at.to_iso8601_string()
)
)
return True
else:
Expand Down Expand Up @@ -310,4 +335,4 @@ def _retryable_error(exception) -> bool:
)
or exception.response is not None
and exception.response.status_code >= 500
)
)