diff --git a/HISTORY.rst b/HISTORY.rst index 7736b01..79a997c 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,12 @@ History ======= +v0.17.21 (2022-11-08) + +* Add zipfile to tsv/csv file function in file_ingestion.py. +* Add new unixodbc driver path: /opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.10.so.1.1. + + v0.17.20 (2022-11-02) * Allow xlsx_to_tsv function to work on python3.7+ by removing python3.10 specific typing using |. diff --git a/aioradio/file_ingestion.py b/aioradio/file_ingestion.py index 268285c..a881037 100644 --- a/aioradio/file_ingestion.py +++ b/aioradio/file_ingestion.py @@ -10,6 +10,7 @@ # pylint: disable=too-many-branches # pylint: disable=too-many-instance-attributes # pylint: disable=too-many-lines +# pylint: disable=too-many-locals # pylint: disable=too-many-nested-blocks # pylint: disable=too-many-public-methods @@ -28,10 +29,11 @@ from dataclasses import field as dc_field from datetime import datetime, timedelta, timezone, tzinfo from pathlib import Path -from tempfile import NamedTemporaryFile +from tempfile import NamedTemporaryFile, TemporaryDirectory from types import coroutine from typing import Any, Dict, List, Union +import cchardet as chardet import mandrill import numpy as np from openpyxl import load_workbook @@ -1623,13 +1625,13 @@ async def get_ftp_file_attributes(conn: SMBConnection, service_name: str, ftp_pa async def xlsx_to_tsv( - s3_source_bucket: str, - s3_source_key: str, - s3_destination_bucket: str, - s3_destination_key: str, - delimiter: str='\t' + s3_source_bucket: str, + s3_source_key: str, + s3_destination_bucket: str, + s3_destination_key: str, + delimiter: str='\t' ) -> Union[str, None]: - """Convert and xlsx file to csv/tsv file. + """Convert xlsx file to csv/tsv file. Args: s3_source_bucket (str): source xlsx file s3 bucket @@ -1643,36 +1645,213 @@ async def xlsx_to_tsv( """ try: + with NamedTemporaryFile(suffix='.xlsx') as tmp: + await download_file(bucket=s3_source_bucket, filepath=tmp.name, s3_key=s3_source_key) + records, _ = xlsx_to_records(tmp) + + await tsv_to_s3(records, delimiter, s3_destination_bucket, s3_destination_key) + except Exception as err: + print(err) + return str(err) + + return None + + +async def zipfile_to_tsv( + s3_source_bucket: str, + s3_source_key: str, + s3_destination_bucket: str, + s3_destination_key: str, + delimiter: str='\t' +) -> Union[str, None]: + """Convert zipfile to csv/tsv file. + + Args: + s3_source_bucket (str): source zipfile s3 bucket + s3_source_key (str): source zipfile s3 key + s3_destination_bucket (str): destination zipfile s3 bucket + s3_destination_key (str): destination zipfile s3 key + delimiter (str, optional): Delimiter. Defaults to '\t'. + + Returns: + Union[str, None]: Error message during process else None + """ + + try: + extensions = ['xlsx', 'txt', 'csv', 'tsv'] records = [] header = None - with NamedTemporaryFile(suffix='.xlsx') as tmp: + with NamedTemporaryFile(suffix='.zip') as tmp: await download_file(bucket=s3_source_bucket, filepath=tmp.name, s3_key=s3_source_key) - workbook = load_workbook(tmp.name, read_only=True) - for sheet in workbook: - sheet.calculate_dimension(force=True) - - for idx, row in enumerate(sheet.values): - items = [str(value) if value is not None else "" for value in row] - - if idx == 0: - if header is None: - header = items - elif header != items: - raise ValueError("Excel sheets must contain the exact same header") + with TemporaryDirectory() as tmp_directory: + for path in await unzip_file_get_filepaths(tmp.name, tmp_directory, include_extensions=extensions): + ext = os.path.splitext(path)[1].lower() + if ext == '.xlsx': + records_from_path, header = xlsx_to_records(path, header) + records.extend(records_from_path) + else: + encoding = detect_encoding(path) + if encoding is None: + raise IOError(f"Failed to detect proper encoding for {path}") + encodings = [encoding] + [i for i in ['UTF-8', 'LATIN-1', 'UTF-16'] if i != encoding] + for encoding in encodings: + try: + delimiter = detect_delimiter(path, encoding) + if delimiter: + try: + records_from_path, header = tsv_to_records(path, encoding, delimiter, header) + records.extend(records_from_path) + break + except Exception as err: + if str(err) == 'Every file must contain the exact same header': + raise ValueError('Every file must contain the exact same header') from err + continue + except Exception as err: + if str(err) == 'Every file must contain the exact same header': + raise ValueError('Every file must contain the exact same header') from err + continue else: - continue + raise IOError(f"Failed to detect proper encoding for {path}") - records.append(items) - workbook.close() - with NamedTemporaryFile(mode='w') as tmp: - writer = csv.writer(tmp, delimiter=delimiter) - writer.writerows(records) - await upload_file(bucket=s3_destination_bucket, filepath=tmp.name, s3_key=s3_destination_key) + await tsv_to_s3(records, delimiter, s3_destination_bucket, s3_destination_key) except Exception as err: print(err) return str(err) return None + + +def tsv_to_records(path: str, encoding: str, delimiter: str, header: str) -> tuple: + """Translate the file data into 2-dimensional list for efficient + processing. + + Args: + path (str): Enrollment file path + encoding (str): File encoding + delimiter (str): Delimiter + header (Union[str, None], optional): Header. Defaults to None. + + Returns: + tuple: Records as list of lists, header + """ + + records = [] + with open(path, newline='', encoding=encoding) as csvfile: + + dialect = csv.Sniffer().sniff(csvfile.read(4096)) + csvfile.seek(0) + + # remove any null characters in the file + reader = csv.reader((line.replace('\0', '') for line in csvfile), dialect=dialect, delimiter=delimiter, doublequote=True) + for row in reader: + + if reader.line_num == 1: + if header is None: + header = row + elif header != row: + raise ValueError("Every file must contain the exact same header") + else: + continue + + records.append(row) + + return records, header + + +def xlsx_to_records(filepath: str, header: Union[str, None]=None) -> tuple: + """Load excel file to records object as list of lists. + + Args: + filepath (str): Temporary Filepath + header (Union[str, None], optional): Header. Defaults to None. + + Raises: + ValueError: Excel sheets must contain the exact same header + + Returns: + tuple: Records as list of lists, header + """ + + records = [] + workbook = load_workbook(filepath, read_only=True) + for sheet in workbook: + if sheet.title != 'hiddenSheet': + sheet.calculate_dimension(force=True) + + for idx, row in enumerate(sheet.values): + items = [str(value) if value is not None else "" for value in row] + + if idx == 0: + if header is None: + header = items + elif header != items: + raise ValueError("Excel sheets must contain the exact same header") + else: + continue + + records.append(items) + workbook.close() + + return records, header + + +async def tsv_to_s3(records: str, delimiter: str, s3_bucket: str, s3_key: str): + """Write records to tsv/csv file then upload to s3. + + Args: + records (str): list of lists with values as strings + delimiter (str): File delimiter + s3_bucket (str): destination s3 bucket + s3_key (str): destination s3 key + """ + + with NamedTemporaryFile(mode='w') as tmp: + writer = csv.writer(tmp, delimiter=delimiter) + writer.writerows(records) + await upload_file(bucket=s3_bucket, filepath=tmp.name, s3_key=s3_key) + + +def detect_encoding(path: str) -> str: + """Detect enrollment file encoding. + + Args: + path (str): Enrollment file path + + Returns: + str: Enrollment file encoding + """ + + encoding = None + with open(path, "rb") as handle: + encoding_dict = chardet.detect(handle.read()) + if 'encoding' in encoding_dict: + encoding = encoding_dict['encoding'].upper() + + return encoding + + +def detect_delimiter(path: str, encoding: str) -> str: + """Detect enrollment file delimiter. + + Args: + path (str): Enrollment file path + encoding (str): File encoding + + Returns: + str: Delimiter + """ + + delimiter = '' + with open(path, newline='', encoding=encoding) as csvfile: + data = csvfile.read(4096) + count = -1 + for item in [',', '\t', '|']: + char_count = data.count(item) + if char_count > count: + delimiter = item + count = char_count + + return delimiter diff --git a/aioradio/pyodbc.py b/aioradio/pyodbc.py index 0591021..782b2ed 100644 --- a/aioradio/pyodbc.py +++ b/aioradio/pyodbc.py @@ -17,7 +17,8 @@ '/usr/lib/libtdsodbc.so', '/usr/local/lib/libtdsodbc.so', '/usr/lib/x86_64-linux-gnu/odbc/libtdsodbc.so', - '/opt/microsoft/msodbcsql/lib64/opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.7.so.2.1' + '/opt/microsoft/msodbcsql/lib64/opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.7.so.2.1', + '/opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.10.so.1.1' ] diff --git a/aioradio/requirements.txt b/aioradio/requirements.txt index f0d2e53..e27b124 100644 --- a/aioradio/requirements.txt +++ b/aioradio/requirements.txt @@ -4,6 +4,7 @@ aiojobs==1.1.0 backoff==2.2.1 boto3==1.24.59 botocore==1.27.59 +cchardet==2.1.7 ddtrace==1.3.6 dominodatalab==1.2.0 fakeredis==1.9.4 diff --git a/setup.py b/setup.py index a53279e..3c208b1 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.17.20', + version='0.17.21', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown", @@ -26,6 +26,7 @@ 'backoff>=2.1.2', 'botocore==1.27.59', 'boto3==1.24.59', + 'cchardet>=2.1.7', 'ddtrace>=0.60.1', 'dominodatalab>=1.1.1', 'fakeredis>=1.7.1',