From 0aa2d14bc23b4de52b535ee00ca64f72689d75f6 Mon Sep 17 00:00:00 2001 From: msenardi Date: Mon, 11 May 2020 19:27:43 +0200 Subject: [PATCH] first commit --- .gitignore | 4 + README.md | 1 + airflow_add_ons/__init__.py | 0 airflow_add_ons/operators/__init__.py | 0 airflow_add_ons/operators/athena_operators.py | 66 +++ airflow_add_ons/operators/glue_operators.py | 372 +++++++++++++++ airflow_add_ons/operators/imap_operators.py | 435 ++++++++++++++++++ airflow_add_ons/operators/lambda_operators.py | 91 ++++ airflow_add_ons/operators/python_operators.py | 5 + airflow_add_ons/operators/zip_operators.py | 126 +++++ airflow_add_ons/sensors/__init__.py | 0 airflow_add_ons/sensors/glue_sensors.py | 69 +++ airflow_add_ons/sensors/s3_sensors.py | 31 ++ requirements/default.txt | 1 + setup.py | 39 ++ 15 files changed, 1240 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 airflow_add_ons/__init__.py create mode 100644 airflow_add_ons/operators/__init__.py create mode 100644 airflow_add_ons/operators/athena_operators.py create mode 100644 airflow_add_ons/operators/glue_operators.py create mode 100644 airflow_add_ons/operators/imap_operators.py create mode 100644 airflow_add_ons/operators/lambda_operators.py create mode 100644 airflow_add_ons/operators/python_operators.py create mode 100644 airflow_add_ons/operators/zip_operators.py create mode 100644 airflow_add_ons/sensors/__init__.py create mode 100644 airflow_add_ons/sensors/glue_sensors.py create mode 100644 airflow_add_ons/sensors/s3_sensors.py create mode 100644 requirements/default.txt create mode 100644 setup.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..98971f9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea +.venv +tests +**/.ipynb_checkpoints/* diff --git a/README.md b/README.md new file mode 100644 index 0000000..aabeeed --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# airflow-add-ons diff --git a/airflow_add_ons/__init__.py b/airflow_add_ons/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/airflow_add_ons/operators/__init__.py b/airflow_add_ons/operators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/airflow_add_ons/operators/athena_operators.py b/airflow_add_ons/operators/athena_operators.py new file mode 100644 index 0000000..a1fd0fb --- /dev/null +++ b/airflow_add_ons/operators/athena_operators.py @@ -0,0 +1,66 @@ +from airflow.hooks.base_hook import BaseHook +from airflow.models import BaseOperator +from airflow.utils import apply_defaults +from botocore.config import Config + +import boto3 + + +class AthenaQueryOperator(BaseOperator): + template_fields = ('query',) + + @apply_defaults + def __init__( + self, + query, + output_location, + aws_conn_id='aws_default', + read_timeout=2000, + workgroup='primary', + *args, + **kwargs + ): + """ + Trigger AWS Athena query + :param query: query to be executed from athena + :param read_timeout: read time in order to wait Resource response before closing connection + :param args: + :param kwargs: + """ + super(AthenaQueryOperator, self).__init__(*args, **kwargs) + + self.query = query + self.output_location = output_location + self.workgroup = workgroup + if read_timeout is not None: + print('check read_timeout') + print(read_timeout) + config = Config(read_timeout=read_timeout, retries={'max_attempts': 0}) + else: + config = Config(retries={'max_attempts': 0}) + if aws_conn_id is not None: + connection = BaseHook.get_connection(aws_conn_id) + self.client = boto3.client('athena', aws_access_key_id=connection.login, + aws_secret_access_key=connection.password, + config=config, + region_name=connection.extra_dejson.get('region_name')) + else: + raise AttributeError('Please pass a valid aws_connection_id') + + def execute(self, context): + + print('Executing AWS Athena query {} with workgroup {}'.format(self.query, self.workgroup)) + + response = self.client.start_query_execution( + QueryString=self.query, + ResultConfiguration={ + 'OutputLocation': self.output_location, + 'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'} + + }, + WorkGroup=self.workgroup + ) + + print('Query execution id {}'.format(response.get('QueryExecutionId'))) + + return response.get('QueryExecutionId') diff --git a/airflow_add_ons/operators/glue_operators.py b/airflow_add_ons/operators/glue_operators.py new file mode 100644 index 0000000..7648467 --- /dev/null +++ b/airflow_add_ons/operators/glue_operators.py @@ -0,0 +1,372 @@ +from airflow.hooks.base_hook import BaseHook +from airflow.models import BaseOperator +from airflow.utils import apply_defaults +from botocore.config import Config + +import boto3 + + +class GlueCrawlerOperator(BaseOperator): + template_fields = ('glue_crawler_name',) + + @apply_defaults + def __init__( + self, + glue_crawler_name, + aws_conn_id='aws_default', + read_timeout=2000, + *args, + **kwargs + ): + """ + Trigger AWS Glue crawler + :param glue_crawler_name: name of Glue crawler + :param read_timeout: read time in order to wait Resource response before closing connection + :param args: + :param kwargs: + """ + super(GlueCrawlerOperator, self).__init__(*args, **kwargs) + + self.glue_crawler_name = glue_crawler_name + if read_timeout is not None: + print('check read_timeout') + print(read_timeout) + config = Config(read_timeout=read_timeout, retries={'max_attempts': 0}) + else: + config = Config(retries={'max_attempts': 0}) + if aws_conn_id is not None: + connection = BaseHook.get_connection(aws_conn_id) + self.client = boto3.client('glue', aws_access_key_id=connection.login, + aws_secret_access_key=connection.password, + config=config, + region_name=connection.extra_dejson.get('region_name')) + else: + raise AttributeError('Please pass a valid aws_connection_id') + + def execute(self, context): + + print('Executing AWS Glue Crawler {}'.format(self.glue_crawler_name)) + + response = self.client.start_crawler(Name=self.glue_crawler_name) + + return response + + +from airflow.exceptions import AirflowException +from airflow.contrib.hooks.aws_hook import AwsHook +import os.path +import time + + +class AwsGlueJobHook(AwsHook): + """ + Interact with AWS Glue - create job, trigger, crawler + :param job_name: unique job name per AWS account + :type str + :param desc: job description + :type str + :param concurrent_run_limit: The maximum number of concurrent runs allowed for a job + :type int + :param script_location: path to etl script either on s3 or local + :type str + :param conns: A list of connections used by the job + :type list + :param retry_limit: Maximum number of times to retry this job if it fails + :type int + :param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job + :type int + :param region_name: aws region name (example: us-east-1) + :type region_name: str + :param s3_bucket: S3 bucket where logs and local etl script will be uploaded + :type str + :param iam_role_name: AWS IAM Role for Glue Job + :type str + """ + + def __init__(self, + job_name=None, + desc=None, + concurrent_run_limit=None, + script_location=None, + conns=None, + retry_limit=None, + num_of_dpus=None, + aws_conn_id='aws_default', + region_name=None, + iam_role_name=None, + worker_type='Standard', + num_workers=1, + tags=None, + default_args=None, + s3_bucket=None, + overwrite_job=True, *args, **kwargs): + self.job_name = job_name + self.desc = desc + self.concurrent_run_limit = concurrent_run_limit or 1 + self.script_location = script_location + self.conns = conns or ["s3"] + self.retry_limit = retry_limit or 0 + self.num_of_dpus = num_of_dpus or 10 + self.aws_conn_id = aws_conn_id + self.region_name = region_name + self.s3_bucket = s3_bucket + self.role_name = iam_role_name + self.worker_type = worker_type + self.num_workers = num_workers + self.tags = tags + self.default_args = default_args + self.overwrite_job = overwrite_job + self.S3_PROTOCOL = "s3://" + self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/' + self.S3_GLUE_LOGS = 'logs/glue-logs/' + super(AwsGlueJobHook, self).__init__(*args, **kwargs) + + def get_conn(self): + conn = self.get_client_type('glue', self.region_name) + return conn + + def list_jobs(self): + conn = self.get_conn() + return conn.get_jobs() + + def get_iam_execution_role(self): + """ + :return: iam role for job execution + """ + iam_client = self.get_client_type('iam', self.region_name) + + try: + glue_execution_role = iam_client.get_role(RoleName=self.role_name) + self.log.info("Iam Role Name: {}".format(self.role_name)) + return glue_execution_role + except Exception as general_error: + raise AirflowException( + 'Failed to create aws glue job, error: {error}'.format( + error=str(general_error) + ) + ) + + def initialize_job(self, script_arguments=None): + """ + Initializes connection with AWS Glue + to run job + :return: + """ + if self.s3_bucket is None: + raise AirflowException( + 'Could not initialize glue job, ' + 'error: Specify Parameter `s3_bucket`' + ) + + glue_client = self.get_conn() + + try: + job_response = self.get_or_create_glue_job() + job_name = job_response['Name'] + job_run = glue_client.start_job_run( + JobName=job_name, + Arguments=self.default_args + ) + return job_run['JobRunId'] + # return self.job_completion(job_name, job_run['JobRunId']) + except Exception as general_error: + raise AirflowException( + 'Failed to run aws glue job, error: {error}'.format( + error=str(general_error) + ) + ) + + def job_completion(self, job_name=None, run_id=None): + """ + :param job_name: + :param run_id: + :return: + """ + glue_client = self.get_conn() + job_status = glue_client.get_job_run( + JobName=job_name, + RunId=run_id, + PredecessorsIncluded=True + ) + job_run_state = job_status['JobRun']['JobRunState'] + failed = job_run_state == 'FAILED' + stopped = job_run_state == 'STOPPED' + completed = job_run_state == 'SUCCEEDED' + + while True: + if failed or stopped or completed: + self.log.info("Exiting Job {} Run State: {}" + .format(run_id, job_run_state)) + return {'JobRunState': job_run_state, 'JobRunId': run_id} + else: + self.log.info("Polling for AWS Glue Job {} current run state" + .format(job_name)) + time.sleep(6) + + def get_or_create_glue_job(self): + glue_client = self.get_conn() + try: + self.log.info("Now creating and running AWS Glue Job") + s3_log_path = "s3://{bucket_name}/{logs_path}{job_name}" \ + .format(bucket_name=self.s3_bucket, + logs_path=self.S3_GLUE_LOGS, + job_name=self.job_name) + + execution_role = self.get_iam_execution_role() + script_location = self._check_script_location() + if self.overwrite_job: + try: + len(glue_client.get_job(JobName=self.job_name).get('Job')) + except Exception as e: + if 'EntityNotFoundException' in str(e) or 'IdempotentParameterMismatchException' in str(e): + self.log.info('Deleting {}'.format(self.job_name)) + glue_client.delete_job(JobName=self.job_name) + else: + raise AirflowException( + 'Failed to get aws glue job, error: {error}'.format( + error=str(e) + ) + ) + + params = { + 'Description': self.desc, + 'LogUri': s3_log_path, + 'Role': execution_role['Role']['RoleName'], + 'ExecutionProperty': {"MaxConcurrentRuns": self.concurrent_run_limit}, + 'Command': {"Name": "glueetl", "ScriptLocation": script_location}, + 'WorkerType': self.worker_type, + 'NumberOfWorkers': self.num_workers, + 'MaxRetries': self.retry_limit, + # AllocatedCapacity=self.num_of_dpus, # TODO: handle AllocatedCapacity with NumberOfWorkers + 'Tags': self.tags, + # 'GlueVersion': '1.0' + 'DefaultArguments': self.default_args, + } + create_job_response = glue_client.create_job( + Name=self.job_name, + **params + ) + # print(create_job_response) + return create_job_response + except Exception as general_error: + raise AirflowException( + 'Failed to create aws glue job, error: {error}'.format( + error=str(general_error) + ) + ) + + def _check_script_location(self): + """ + :return: S3 Script location path + """ + if self.script_location[:5] == self.S3_PROTOCOL: + return self.script_location + elif os.path.isfile(self.script_location): + s3 = self.get_resource_type('s3', self.region_name) + script_name = os.path.basename(self.script_location) + s3.meta.client.upload_file(self.script_location, + self.s3_bucket, + self.S3_ARTIFACTS_PREFIX + script_name) + + s3_script_path = "s3://{s3_bucket}/{prefix}{job_name}/{script_name}" \ + .format(s3_bucket=self.s3_bucket, + prefix=self.S3_ARTIFACTS_PREFIX, + job_name=self.job_name, + script_name=script_name) + return s3_script_path + else: + return None + + +class AWSGlueJobOperator(BaseOperator): + """ + Creates an AWS Glue Job. AWS Glue is a serverless Spark + ETL service for running Spark Jobs on the AWS cloud. + Language support: Python and Scala + :param job_name: unique job name per AWS Account + :type str + :param script_location: location of ETL script. Must be a local or S3 path + :type str + :param job_desc: job description details + :type str + :param concurrent_run_limit: The maximum number of concurrent runs allowed for a job + :type int + :param script_args: etl script arguments and AWS Glue arguments + :type dict + :param connections: AWS Glue connections to be used by the job. + :type list + :param retry_limit: The maximum number of times to retry this job if it fails + :type int + :param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job. + :type int + :param region_name: aws region name (example: us-east-1) + :type region_name: str + :param s3_bucket: S3 bucket where logs and local etl script will be uploaded + :type str + :param iam_role_name: AWS IAM Role for Glue Job Execution + :type str + """ + template_fields = ('script_args', 'script_location', 's3_bucket', 'iam_role_name',) + template_ext = () + ui_color = '#ededed' + + @apply_defaults + def __init__(self, + job_name='aws_glue_default_job', + job_desc='AWS Glue Job with Airflow', + script_location=None, + concurrent_run_limit=None, + script_args={}, + connections=[], + retry_limit=None, + num_of_dpus=3, + aws_conn_id='aws_default', + region_name=None, + s3_bucket=None, + iam_role_name=None, + worker_type='Standard', # | 'G.1X' | 'G.2X' + num_workers=2, + tags=None, + *args, **kwargs + ): + super(AWSGlueJobOperator, self).__init__(*args, **kwargs) + self.job_name = job_name + self.job_desc = job_desc + self.script_location = script_location + self.concurrent_run_limit = concurrent_run_limit + self.script_args = script_args + self.connections = connections + self.retry_limit = retry_limit + self.num_of_dpus = num_of_dpus + self.aws_conn_id = aws_conn_id + self.region_name = region_name + self.s3_bucket = s3_bucket + self.iam_role_name = iam_role_name + self.worker_type = worker_type + self.num_workers = num_workers + self.tags = tags + + def execute(self, context): + """ + Executes AWS Glue Job from Airflow + :return: + """ + glue_job = AwsGlueJobHook(job_name=self.job_name, + desc=self.job_desc, + concurrent_run_limit=self.concurrent_run_limit, + script_location=self.script_location, + conns=self.connections, + retry_limit=self.retry_limit, + num_of_dpus=self.num_of_dpus, + aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + s3_bucket=self.s3_bucket, + iam_role_name=self.iam_role_name, + worker_type=self.worker_type, + num_workers=self.num_workers, + default_args=self.script_args, + tags=self.tags) + + self.log.info("Initializing AWS Glue Job: {}".format(self.job_name)) + job_run_id = glue_job.initialize_job(None) + return job_run_id diff --git a/airflow_add_ons/operators/imap_operators.py b/airflow_add_ons/operators/imap_operators.py new file mode 100644 index 0000000..14db6e1 --- /dev/null +++ b/airflow_add_ons/operators/imap_operators.py @@ -0,0 +1,435 @@ +from airflow.hooks.S3_hook import S3Hook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +import email +import imaplib +import os +import re + +from airflow import AirflowException, LoggingMixin +from airflow.hooks.base_hook import BaseHook + + +class ImapHook(BaseHook): + """ + This hook connects to a mail server by using the imap protocol. + + .. note:: Please call this Hook as context manager via `with` + to automatically open and close the connection to the mail server. + + :param imap_conn_id: The connection id that contains the information used to authenticate the client. + :type imap_conn_id: str + """ + + def __init__(self, imap_conn_id='imap_default'): + self.imap_conn_id = imap_conn_id + self.mail_client = None + + def __enter__(self): + return self.get_conn() + + + def __exit__(self, exc_type, exc_val, exc_tb): + self.mail_client.logout() + + + def get_conn(self): + """ + Login to the mail server. + + .. note:: Please call this Hook as context manager via `with` + to automatically open and close the connection to the mail server. + + :return: an authorized ImapHook object. + :rtype: ImapHook + """ + + if not self.mail_client: + conn = self.get_connection(self.imap_conn_id) + self.mail_client = imaplib.IMAP4_SSL(conn.host) + self.mail_client.login(conn.login, conn.password) + + return self + + + def has_mail_attachment(self, name, *, check_regex=False, mail_folder='INBOX', mail_filter='All'): + """ + Checks the mail folder for mails containing attachments with the given name. + + :param name: The name of the attachment that will be searched for. + :type name: str + :param check_regex: Checks the name for a regular expression. + :type check_regex: bool + :param mail_folder: The mail folder where to look at. + :type mail_folder: str + :param mail_filter: If set other than 'All' only specific mails will be checked. + See :py:meth:`imaplib.IMAP4.search` for details. + :type mail_filter: str + :returns: True if there is an attachment with the given name and False if not. + :rtype: bool + """ + mail_attachments = self._retrieve_mails_attachments_by_name(name, + check_regex, + True, + mail_folder, + mail_filter) + return len(mail_attachments) > 0 + + + def retrieve_mail_attachments(self, + name, + *, + check_regex=False, + latest_only=False, + mail_folder='INBOX', + mail_filter='All', + not_found_mode='raise'): + """ + Retrieves mail's attachments in the mail folder by its name. + + :param name: The name of the attachment that will be downloaded. + :type name: str + :param check_regex: Checks the name for a regular expression. + :type check_regex: bool + :param latest_only: If set to True it will only retrieve the first matched attachment. + :type latest_only: bool + :param mail_folder: The mail folder where to look at. + :type mail_folder: str + :param mail_filter: If set other than 'All' only specific mails will be checked. + See :py:meth:`imaplib.IMAP4.search` for details. + :type mail_filter: str + :param not_found_mode: Specify what should happen if no attachment has been found. + Supported values are 'raise', 'warn' and 'ignore'. + If it is set to 'raise' it will raise an exception, + if set to 'warn' it will only print a warning and + if set to 'ignore' it won't notify you at all. + :type not_found_mode: str + :returns: a list of tuple each containing the attachment filename and its payload. + :rtype: a list of tuple + """ + mail_attachments = self._retrieve_mails_attachments_by_name(name, + check_regex, + latest_only, + mail_folder, + mail_filter) + + if not mail_attachments: + self._handle_not_found_mode(not_found_mode) + + return mail_attachments + + + def download_mail_attachments(self, + name, + local_output_directory, + *, + check_regex=False, + latest_only=False, + mail_folder='INBOX', + mail_filter='All', + not_found_mode='raise'): + """ + Downloads mail's attachments in the mail folder by its name to the local directory. + + :param name: The name of the attachment that will be downloaded. + :type name: str + :param local_output_directory: The output directory on the local machine + where the files will be downloaded to. + :type local_output_directory: str + :param check_regex: Checks the name for a regular expression. + :type check_regex: bool + :param latest_only: If set to True it will only download the first matched attachment. + :type latest_only: bool + :param mail_folder: The mail folder where to look at. + :type mail_folder: str + :param mail_filter: If set other than 'All' only specific mails will be checked. + See :py:meth:`imaplib.IMAP4.search` for details. + :type mail_filter: str + :param not_found_mode: Specify what should happen if no attachment has been found. + Supported values are 'raise', 'warn' and 'ignore'. + If it is set to 'raise' it will raise an exception, + if set to 'warn' it will only print a warning and + if set to 'ignore' it won't notify you at all. + :type not_found_mode: str + """ + mail_attachments = self._retrieve_mails_attachments_by_name(name, + check_regex, + latest_only, + mail_folder, + mail_filter) + + if not mail_attachments: + self._handle_not_found_mode(not_found_mode) + + self._create_files(mail_attachments, local_output_directory) + + + def _handle_not_found_mode(self, not_found_mode): + if not_found_mode == 'raise': + raise AirflowException('No mail attachments found!') + if not_found_mode == 'warn': + self.log.warning('No mail attachments found!') + elif not_found_mode == 'ignore': + pass # Do not notify if the attachment has not been found. + else: + self.log.error('Invalid "not_found_mode" %s', not_found_mode) + + + def _retrieve_mails_attachments_by_name(self, name, check_regex, latest_only, mail_folder, mail_filter): + all_matching_attachments = [] + + self.mail_client.select(mail_folder) + + for mail_id in self._list_mail_ids_desc(mail_filter): + response_mail_body = self._fetch_mail_body(mail_id) + matching_attachments = self._check_mail_body(response_mail_body, name, check_regex, latest_only) + + if matching_attachments: + all_matching_attachments.extend(matching_attachments) + if latest_only: + break + + self.mail_client.close() + + return all_matching_attachments + + + def _list_mail_ids_desc(self, mail_filter): + _, data = self.mail_client.search(None, mail_filter) + mail_ids = data[0].split() + return reversed(mail_ids) + + + def _fetch_mail_body(self, mail_id): + _, data = self.mail_client.fetch(mail_id, '(RFC822)') + mail_body = data[0][1] # The mail body is always in this specific location + mail_body_str = mail_body.decode('utf-8') + return mail_body_str + + + def _check_mail_body(self, response_mail_body, name, check_regex, latest_only): + mail = Mail(response_mail_body) + if mail.has_attachments(): + return mail.get_attachments_by_name(name, check_regex, find_first=latest_only) + return [] + + + def _create_files(self, mail_attachments, local_output_directory): + for name, payload in mail_attachments: + if self._is_symlink(name): + self.log.error('Can not create file because it is a symlink!') + elif self._is_escaping_current_directory(name): + self.log.error('Can not create file because it is escaping the current directory!') + else: + self._create_file(name, payload, local_output_directory) + + + def _is_symlink(self, name): + # IMPORTANT NOTE: os.path.islink is not working for windows symlinks + # See: https://stackoverflow.com/a/11068434 + return os.path.islink(name) + + + def _is_escaping_current_directory(self, name): + return '../' in name + + + def _correct_path(self, name, local_output_directory): + return local_output_directory + name if local_output_directory.endswith('/') \ + else local_output_directory + '/' + name + + + def _create_file(self, name, payload, local_output_directory): + file_path = self._correct_path(name, local_output_directory) + + with open(file_path, 'wb') as file: + file.write(payload) + + + +class Mail(LoggingMixin): + """ + This class simplifies working with mails returned by the imaplib client. + + :param mail_body: The mail body of a mail received from imaplib client. + :type mail_body: str + """ + + def __init__(self, mail_body): + super().__init__() + self.mail = email.message_from_string(mail_body) + + def has_attachments(self): + """ + Checks the mail for a attachments. + + :returns: True if it has attachments and False if not. + :rtype: bool + """ + return self.mail.get_content_maintype() == 'multipart' + + + def get_attachments_by_name(self, name, check_regex, find_first=False): + """ + Gets all attachments by name for the mail. + + :param name: The name of the attachment to look for. + :type name: str + :param check_regex: Checks the name for a regular expression. + :type check_regex: bool + :param find_first: If set to True it will only find the first match and then quit. + :type find_first: bool + :returns: a list of tuples each containing name and payload + where the attachments name matches the given name. + :rtype: list of tuple + """ + attachments = [] + + for attachment in self._iterate_attachments(): + found_attachment = attachment.has_matching_name(name) if check_regex \ + else attachment.has_equal_name(name) + if found_attachment: + file_name, file_payload = attachment.get_file() + self.log.info('Found attachment: {}'.format(file_name)) + attachments.append((file_name, file_payload)) + if find_first: + break + + return attachments + + + def _iterate_attachments(self): + for part in self.mail.walk(): + mail_part = MailPart(part) + if mail_part.is_attachment(): + yield mail_part + + + +class MailPart: + """ + This class is a wrapper for a Mail object's part and gives it more features. + + :param part: The mail part in a Mail object. + :type part: any + """ + + def __init__(self, part): + self.part = part + + def is_attachment(self): + """ + Checks if the part is a valid mail attachment. + + :returns: True if it is an attachment and False if not. + :rtype: bool + """ + return self.part.get_content_maintype() != 'multipart' and self.part.get('Content-Disposition') + + + def has_matching_name(self, name): + """ + Checks if the given name matches the part's name. + + :param name: The name to look for. + :type name: str + :returns: True if it matches the name (including regular expression). + :rtype: tuple + """ + return re.match(name, self.part.get_filename()) + + + def has_equal_name(self, name): + """ + Checks if the given name is equal to the part's name. + + :param name: The name to look for. + :type name: str + :returns: True if it is equal to the given name. + :rtype: bool + """ + return self.part.get_filename() == name + + + def get_file(self): + """ + Gets the file including name and payload. + + :returns: the part's name and payload. + :rtype: tuple + """ + return self.part.get_filename(), self.part.get_payload(decode=True) + + +class ImapAttachmentToS3Operator(BaseOperator): + """ + Transfers a mail attachment from a mail server into s3 bucket. + + :param imap_attachment_name: The file name of the mail attachment that you want to transfer. + :type imap_attachment_name: str + :param s3_key: The destination file name in the s3 bucket for the attachment. + :type s3_key: str + :param imap_check_regex: If set checks the `imap_attachment_name` for a regular expression. + :type imap_check_regex: bool + :param imap_mail_folder: The folder on the mail server to look for the attachment. + :type imap_mail_folder: str + :param imap_mail_filter: If set other than 'All' only specific mails will be checked. + See :py:meth:`imaplib.IMAP4.search` for details. + :type imap_mail_filter: str + :param s3_overwrite: If set overwrites the s3 key if already exists. + :type s3_overwrite: bool + :param imap_conn_id: The reference to the connection details of the mail server. + :type imap_conn_id: str + :param s3_conn_id: The reference to the s3 connection details. + :type s3_conn_id: str + """ + template_fields = ('imap_attachment_name', 's3_key', 'imap_mail_filter') + + + @apply_defaults + def __init__(self, + imap_attachment_name, + s3_key, + imap_check_regex=False, + imap_mail_folder='INBOX', + imap_mail_filter='All', + s3_overwrite=False, + imap_conn_id='imap_default', + s3_conn_id='aws_default', + *args, + **kwargs): + super().__init__(*args, **kwargs) + self.imap_attachment_name = imap_attachment_name + self.s3_key = s3_key + self.imap_check_regex = imap_check_regex + self.imap_mail_folder = imap_mail_folder + self.imap_mail_filter = imap_mail_filter + self.s3_overwrite = s3_overwrite + self.imap_conn_id = imap_conn_id + self.s3_conn_id = s3_conn_id + + def execute(self, context): + """ + This function executes the transfer from the email server (via imap) into s3. + + :param context: The context while executing. + :type context: dict + """ + self.log.info( + 'Transferring mail attachment %s from mail server via imap to s3 key %s...', + self.imap_attachment_name, self.s3_key + ) + + with ImapHook(imap_conn_id=self.imap_conn_id) as imap_hook: + imap_mail_attachments = imap_hook.retrieve_mail_attachments( + name=self.imap_attachment_name, + check_regex=self.imap_check_regex, + latest_only=True, + mail_folder=self.imap_mail_folder, + mail_filter=self.imap_mail_filter, + ) + + s3_hook = S3Hook(aws_conn_id=self.s3_conn_id) + s3_hook.load_bytes(bytes_data=imap_mail_attachments[0][1], + key=self.s3_key, + replace=self.s3_overwrite) diff --git a/airflow_add_ons/operators/lambda_operators.py b/airflow_add_ons/operators/lambda_operators.py new file mode 100644 index 0000000..e0496c7 --- /dev/null +++ b/airflow_add_ons/operators/lambda_operators.py @@ -0,0 +1,91 @@ +from airflow.hooks.base_hook import BaseHook +from airflow.models import BaseOperator +from airflow.utils import apply_defaults +from airflow.exceptions import AirflowException +from botocore.config import Config + +import boto3 +import json +import base64 + + +class ExecuteLambdaOperator(BaseOperator): + template_fields = ('additional_payload',) + + @apply_defaults + def __init__( + self, + lambda_function_name, + airflow_context_to_lambda_payload=None, + additional_payload=None, + aws_conn_id='aws_default', + read_timeout=2000, + *args, + **kwargs + ): + """ + Trigger AWS Lambda function + :param lambda_function_name: name of Lambda function + :param airflow_context_to_lambda_payload: function extracting fields from Airflow context to Lambda payload + :param additional_payload: additional parameters for Lambda payload + :param aws_conn_id: aws connection id in order to call Lambda function + :param read_timeout: read time in order to wait Resource response before closing connection + :param args: + :param kwargs: + """ + super(ExecuteLambdaOperator, self).__init__(*args, **kwargs) + if additional_payload is None: + additional_payload = {} + self.airflow_context_to_lambda_payload = airflow_context_to_lambda_payload + + self.additional_payload = additional_payload + self.lambda_function_name = lambda_function_name + if read_timeout is not None: + print('check read_timeout') + print(read_timeout) + config = Config(read_timeout=read_timeout, retries={'max_attempts': 0}) + else: + config = Config(retries={'max_attempts': 0}) + if aws_conn_id is not None: + connection = BaseHook.get_connection(aws_conn_id) + self.lambda_client = boto3.client('lambda', aws_access_key_id=connection.login, + aws_secret_access_key=connection.password, + config=config, + region_name=connection.extra_dejson.get('region_name')) + else: + raise AttributeError('Please pass a valid aws_connection_id') + + def execute(self, context): + request_payload = self.__create_lambda_payload(context) + + print('Executing AWS Lambda {} with payload {}'.format(self.lambda_function_name, request_payload)) + + response = self.lambda_client.invoke( + FunctionName=self.lambda_function_name, + InvocationType='RequestResponse', + Payload=json.dumps(request_payload), + LogType='Tail' + ) + + response_log_tail = base64.b64decode(response.get('LogResult')) + response_payload = json.loads(response.get('Payload').read()) + response_code = response.get('StatusCode') + + log_msg_logs = 'Tail of logs from AWS Lambda:\n{logs}'.format(logs=response_log_tail) + log_msg_payload = 'Response payload from AWS Lambda:\n{resp}'.format(resp=response_payload) + + if response_code == 200: + print(log_msg_logs) + print(log_msg_payload) + return response_code + else: + print(log_msg_logs) + print(log_msg_payload) + raise AirflowException('Lambda invoke failed') + + def __create_lambda_payload(self, context): + payload = self.airflow_context_to_lambda_payload( + context) if self.airflow_context_to_lambda_payload is not None else {} + payload.update(self.additional_payload) + print('payload: {}'.format(payload)) + return payload diff --git a/airflow_add_ons/operators/python_operators.py b/airflow_add_ons/operators/python_operators.py new file mode 100644 index 0000000..e926645 --- /dev/null +++ b/airflow_add_ons/operators/python_operators.py @@ -0,0 +1,5 @@ +from airflow.operators.python_operator import PythonOperator + + +class TemplatedPythonOperator(PythonOperator): + template_fields = ('templates_dict', 'op_args', 'op_kwargs') diff --git a/airflow_add_ons/operators/zip_operators.py b/airflow_add_ons/operators/zip_operators.py new file mode 100644 index 0000000..85700f6 --- /dev/null +++ b/airflow_add_ons/operators/zip_operators.py @@ -0,0 +1,126 @@ +from airflow.plugins_manager import AirflowPlugin +from airflow.models import BaseOperator +from airflow.utils import apply_defaults + +from zipfile import ZipFile +import os +import logging + + +class ZipOperator(BaseOperator): + """ + An operator which takes in a path to a file and zips the contents to a location you define. + :param path_to_file_to_zip: Full path to the file you want to Zip + :type path_to_file_to_zip: string + :param path_to_save_zip: Full path to where you want to save the Zip file + :type path_to_save_zip: string + """ + + template_fields = ('path_to_file_to_zip', 'path_to_save_zip') + template_ext = [] + ui_color = '#ffffff' # ZipOperator's Main Color: white # todo: find better color + + @apply_defaults + def __init__( + self, + path_to_file_to_zip, + path_to_save_zip, + *args, **kwargs): + super(ZipOperator, self).__init__(*args, **kwargs) + self.path_to_file_to_zip = path_to_file_to_zip + self.path_to_save_zip = path_to_save_zip + + def execute(self, context): + logging.info("Executing ZipOperator.execute(context)") + + logging.info("Path to the File to Zip provided by the User (path_to_file_to_zip): " + str(self.path_to_file_to_zip)) + logging.info("Path to save the Zip File provided by the User (path_to_save_zip) : " + str(self.path_to_save_zip)) + + dir_path_to_file_to_zip = os.path.dirname(os.path.abspath(self.path_to_file_to_zip)) + logging.info("Absolute path to the File to Zip: " + str(dir_path_to_file_to_zip)) + + zip_file_name = os.path.basename(self.path_to_save_zip) + logging.info("Zip File Name: " + str(zip_file_name)) + + file_to_zip_name = os.path.basename(self.path_to_file_to_zip) + logging.info("Name of the File or Folder to be Zipped: " + str(file_to_zip_name)) + + os.chdir(dir_path_to_file_to_zip) + logging.info("Current Working Directory: " + str(os.getcwd())) + + with ZipFile(zip_file_name, 'w') as zip_file: + logging.info("Created zip file object '" + str(zip_file) + "' with name '" + str(zip_file_name) + "'") + is_file = os.path.isfile(self.path_to_file_to_zip) + logging.info("Is the File to Zip a File (else its a folder): " + str(is_file)) + if is_file: + logging.info("Writing '" + str(file_to_zip_name) + "to zip file") + zip_file.write(file_to_zip_name) + else: # is folder + for dirname, subdirs, files in os.walk(file_to_zip_name): + logging.info("Writing '" + str(dirname) + "to zip file") + zip_file.write(dirname) + for filename in files: + file_name_to_write = os.path.join(dirname, filename) + logging.info("Writing '" + str(file_name_to_write) + "to zip file") + zip_file.write(file_name_to_write) + + logging.info("Closing Zip File Object") + zip_file.close() + + logging.info("Moving '" + str(zip_file_name) + "' to '" + str(self.path_to_save_zip) + "'") + os.rename(zip_file_name, self.path_to_save_zip) + + logging.info("Finished executing ZipOperator.execute(context)") + return self.path_to_file_to_zip + + +class UnzipOperator(BaseOperator): + """ + An operator which takes in a path to a zip file and unzips the contents to a location you define. + :param path_to_zip_file: Full path to the zip file you want to Unzip + :type path_to_zip_file: string + :param path_to_unzip_contents: Full path to where you want to save the contents of the Zip file you're Unzipping + :type path_to_unzip_contents: string + """ + + template_fields = ('path_to_zip_file', 'path_to_unzip_contents') + template_ext = [] + ui_color = '#ffffff' # UnzipOperator's Main Color: white # todo: find better color + + @apply_defaults + def __init__( + self, + path_to_zip_file, + path_to_unzip_contents, + *args, **kwargs): + super(UnzipOperator, self).__init__(*args, **kwargs) + + self.path_to_zip_file = path_to_zip_file + self.path_to_unzip_contents = path_to_unzip_contents + + def execute(self, context): + logging.info("Executing UnzipOperator.execute(context)") + + logging.info("path_to_zip_file: " + str(self.path_to_zip_file)) + logging.info("path_to_unzip_contents: " + str(self.path_to_unzip_contents)) + + # No check is done if the zip file is valid so that the operator fails when expected so that airflow can properly mark the task as failed and schedule retries as needed + with ZipFile(self.path_to_zip_file, 'r') as zip_file: + logging.info("Created zip file object '" + str(zip_file) + "' from path '" + str(self.path_to_zip_file) + "'") + logging.info("Extracting all the contents to '" + str(self.path_to_unzip_contents) + "'") + zip_file.extractall(self.path_to_unzip_contents) + logging.info("Closing Zip File Object") + zip_file.close() + + logging.info("Finished executing UnzipOperator.execute(context)") + + +# Defining the plugin class +class ZipOperatorPlugin(AirflowPlugin): + name = "zip_operator_plugin" + operators = [ZipOperator, UnzipOperator] + flask_blueprints = [] + hooks = [] + executors = [] + admin_views = [] + menu_links = [] diff --git a/airflow_add_ons/sensors/__init__.py b/airflow_add_ons/sensors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/airflow_add_ons/sensors/glue_sensors.py b/airflow_add_ons/sensors/glue_sensors.py new file mode 100644 index 0000000..828bc7f --- /dev/null +++ b/airflow_add_ons/sensors/glue_sensors.py @@ -0,0 +1,69 @@ +from airflow.contrib.hooks.aws_glue_catalog_hook import AwsGlueCatalogHook +from airflow.sensors.base_sensor_operator import BaseSensorOperator +from airflow.utils import apply_defaults +from airflow.exceptions import AirflowException + + +class ExtendedAwsGlueCatalogHook(AwsGlueCatalogHook): + + def get_job_run(self, job_name, job_run_id): + return self.get_conn().get_job_run(JobName=job_name, RunId=job_run_id) + + +class GlueJobFlowSensor(BaseSensorOperator): + """ + Asks for the state of the JobFlow until it reaches a terminal state. + If it fails the sensor errors, failing the task. + + :param job_name: job name to check the state of + :type job_name: string + :param job_run_id: job run identifier to check the state of + :type job_run_id: string + """ + + ui_color = '#66c3ff' + + NON_TERMINAL_STATES = ['STARTING', 'RUNNING', 'STOPPING', 'STOPPED'] + FAILED_STATE = ['FAILED', 'TIMEOUT'] + template_fields = ('job_name', 'job_run_id') + template_ext = () + + @apply_defaults + def __init__(self, + aws_conn_id, + job_name, + job_run_id, + *args, + **kwargs): + super(GlueJobFlowSensor, self).__init__(*args, **kwargs) + self.aws_conn_id = aws_conn_id + self.job_name = job_name + self.job_run_id = job_run_id + + def poke(self, context): + response = self.get_glue_job_response() + + if not response['ResponseMetadata']['HTTPStatusCode'] == 200: + self.log.info('Bad HTTP response: %s', response) + return False + + state = self.state_from_response(response) + self.log.info('Job flow currently %s', state) + + if state in self.NON_TERMINAL_STATES: + return False + + if state in self.FAILED_STATE: + raise AirflowException('Glue job failed') + + return True + + def get_glue_job_response(self): + glue = ExtendedAwsGlueCatalogHook(aws_conn_id=self.aws_conn_id) + + self.log.info('Poking for job %s %s', self.job_name, self.job_run_id) + return glue.get_job_run(job_name=self.job_name, job_run_id=self.job_run_id) + + @staticmethod + def state_from_response(response): + return response['JobRun']['JobRunState'] diff --git a/airflow_add_ons/sensors/s3_sensors.py b/airflow_add_ons/sensors/s3_sensors.py new file mode 100644 index 0000000..b84fd88 --- /dev/null +++ b/airflow_add_ons/sensors/s3_sensors.py @@ -0,0 +1,31 @@ +from airflow.hooks.S3_hook import S3Hook +from airflow.operators.sensors import S3KeySensor + + +class ReturnS3KeySensor(S3KeySensor): + + def __init__(self, + delimiter='/', + *args, + **kwargs): + super(ReturnS3KeySensor, self).__init__(*args, **kwargs) + self.delimiter = delimiter + + def get_object_key(self): + s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + s3_object = s3.get_wildcard_key( + bucket_name=self.bucket_name, + wildcard_key=self.bucket_key, + delimiter=self.delimiter, + ) + + if s3_object is None: + raise AttributeError( + 'file not found in {}:{}'.format(self.bucket_name, self.bucket_key) + ) + + return s3_object.key + + def execute(self, context): + super(ReturnS3KeySensor, self).execute(context) + return self.get_object_key() diff --git a/requirements/default.txt b/requirements/default.txt new file mode 100644 index 0000000..54dfaa3 --- /dev/null +++ b/requirements/default.txt @@ -0,0 +1 @@ +apache-airflow[celery,crypto,hive,password,postgres,s3,slack,gcp]==1.10.10 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..99b538e --- /dev/null +++ b/setup.py @@ -0,0 +1,39 @@ +import os + +from setuptools import setup, find_packages + + +def strip_comments(l): + return l.split('#', 1)[0].strip() + + +def _pip_requirement(req): + if req.startswith('-r '): + _, path = req.split() + return reqs(*path.split('/')) + return [req] + + +def _reqs(*f): + return [ + _pip_requirement(r) for r in ( + strip_comments(l) for l in open( + os.path.join(os.getcwd(), 'requirements', *f)).readlines() + ) if r] + + +def reqs(*f): + return [req for subreq in _reqs(*f) for req in subreq] + + +setup(name='airflow-add-ons', + version='0.1.1', + url='https://github.com/pualien/airflow-add-ons', + license='MIT', + author='Matteo Senardi', + author_email='pualien@gmail.com', + description='Airflow extensible opertators and sensors', + packages=find_packages(exclude=['tests']), + install_requires=reqs('default.txt'), + long_description=open('README.md').read(), + zip_safe=False)