Skip to content

Commit

Permalink
Add zipfile to tsv/csv file function
Browse files Browse the repository at this point in the history
  • Loading branch information
tim.reichard committed Nov 8, 2022
1 parent 6102c1c commit de5f424
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 29 deletions.
6 changes: 6 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ History
=======


v0.17.21 (2022-11-08)

* Add zipfile to tsv/csv file function in file_ingestion.py.
* Add new unixodbc driver path: /opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.10.so.1.1.


v0.17.20 (2022-11-02)

* Allow xlsx_to_tsv function to work on python3.7+ by removing python3.10 specific typing using |.
Expand Down
233 changes: 206 additions & 27 deletions aioradio/file_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# pylint: disable=too-many-branches
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-lines
# pylint: disable=too-many-locals
# pylint: disable=too-many-nested-blocks
# pylint: disable=too-many-public-methods

Expand All @@ -28,10 +29,11 @@
from dataclasses import field as dc_field
from datetime import datetime, timedelta, timezone, tzinfo
from pathlib import Path
from tempfile import NamedTemporaryFile
from tempfile import NamedTemporaryFile, TemporaryDirectory
from types import coroutine
from typing import Any, Dict, List, Union

import cchardet as chardet
import mandrill
import numpy as np
from openpyxl import load_workbook
Expand Down Expand Up @@ -1623,13 +1625,13 @@ async def get_ftp_file_attributes(conn: SMBConnection, service_name: str, ftp_pa


async def xlsx_to_tsv(
s3_source_bucket: str,
s3_source_key: str,
s3_destination_bucket: str,
s3_destination_key: str,
delimiter: str='\t'
s3_source_bucket: str,
s3_source_key: str,
s3_destination_bucket: str,
s3_destination_key: str,
delimiter: str='\t'
) -> Union[str, None]:
"""Convert and xlsx file to csv/tsv file.
"""Convert xlsx file to csv/tsv file.
Args:
s3_source_bucket (str): source xlsx file s3 bucket
Expand All @@ -1643,36 +1645,213 @@ async def xlsx_to_tsv(
"""

try:
with NamedTemporaryFile(suffix='.xlsx') as tmp:
await download_file(bucket=s3_source_bucket, filepath=tmp.name, s3_key=s3_source_key)
records, _ = xlsx_to_records(tmp)

await tsv_to_s3(records, delimiter, s3_destination_bucket, s3_destination_key)
except Exception as err:
print(err)
return str(err)

return None


async def zipfile_to_tsv(
s3_source_bucket: str,
s3_source_key: str,
s3_destination_bucket: str,
s3_destination_key: str,
delimiter: str='\t'
) -> Union[str, None]:
"""Convert zipfile to csv/tsv file.
Args:
s3_source_bucket (str): source zipfile s3 bucket
s3_source_key (str): source zipfile s3 key
s3_destination_bucket (str): destination zipfile s3 bucket
s3_destination_key (str): destination zipfile s3 key
delimiter (str, optional): Delimiter. Defaults to '\t'.
Returns:
Union[str, None]: Error message during process else None
"""

try:
extensions = ['xlsx', 'txt', 'csv', 'tsv']
records = []
header = None

with NamedTemporaryFile(suffix='.xlsx') as tmp:
with NamedTemporaryFile(suffix='.zip') as tmp:
await download_file(bucket=s3_source_bucket, filepath=tmp.name, s3_key=s3_source_key)
workbook = load_workbook(tmp.name, read_only=True)
for sheet in workbook:
sheet.calculate_dimension(force=True)

for idx, row in enumerate(sheet.values):
items = [str(value) if value is not None else "" for value in row]

if idx == 0:
if header is None:
header = items
elif header != items:
raise ValueError("Excel sheets must contain the exact same header")
with TemporaryDirectory() as tmp_directory:
for path in await unzip_file_get_filepaths(tmp.name, tmp_directory, include_extensions=extensions):
ext = os.path.splitext(path)[1].lower()
if ext == '.xlsx':
records_from_path, header = xlsx_to_records(path, header)
records.extend(records_from_path)
else:
encoding = detect_encoding(path)
if encoding is None:
raise IOError(f"Failed to detect proper encoding for {path}")
encodings = [encoding] + [i for i in ['UTF-8', 'LATIN-1', 'UTF-16'] if i != encoding]
for encoding in encodings:
try:
delimiter = detect_delimiter(path, encoding)
if delimiter:
try:
records_from_path, header = tsv_to_records(path, encoding, delimiter, header)
records.extend(records_from_path)
break
except Exception as err:
if str(err) == 'Every file must contain the exact same header':
raise ValueError('Every file must contain the exact same header') from err
continue
except Exception as err:
if str(err) == 'Every file must contain the exact same header':
raise ValueError('Every file must contain the exact same header') from err
continue
else:
continue
raise IOError(f"Failed to detect proper encoding for {path}")

records.append(items)
workbook.close()

with NamedTemporaryFile(mode='w') as tmp:
writer = csv.writer(tmp, delimiter=delimiter)
writer.writerows(records)
await upload_file(bucket=s3_destination_bucket, filepath=tmp.name, s3_key=s3_destination_key)
await tsv_to_s3(records, delimiter, s3_destination_bucket, s3_destination_key)

except Exception as err:
print(err)
return str(err)

return None


def tsv_to_records(path: str, encoding: str, delimiter: str, header: str) -> tuple:
"""Translate the file data into 2-dimensional list for efficient
processing.
Args:
path (str): Enrollment file path
encoding (str): File encoding
delimiter (str): Delimiter
header (Union[str, None], optional): Header. Defaults to None.
Returns:
tuple: Records as list of lists, header
"""

records = []
with open(path, newline='', encoding=encoding) as csvfile:

dialect = csv.Sniffer().sniff(csvfile.read(4096))
csvfile.seek(0)

# remove any null characters in the file
reader = csv.reader((line.replace('\0', '') for line in csvfile), dialect=dialect, delimiter=delimiter, doublequote=True)
for row in reader:

if reader.line_num == 1:
if header is None:
header = row
elif header != row:
raise ValueError("Every file must contain the exact same header")
else:
continue

records.append(row)

return records, header


def xlsx_to_records(filepath: str, header: Union[str, None]=None) -> tuple:
"""Load excel file to records object as list of lists.
Args:
filepath (str): Temporary Filepath
header (Union[str, None], optional): Header. Defaults to None.
Raises:
ValueError: Excel sheets must contain the exact same header
Returns:
tuple: Records as list of lists, header
"""

records = []
workbook = load_workbook(filepath, read_only=True)
for sheet in workbook:
if sheet.title != 'hiddenSheet':
sheet.calculate_dimension(force=True)

for idx, row in enumerate(sheet.values):
items = [str(value) if value is not None else "" for value in row]

if idx == 0:
if header is None:
header = items
elif header != items:
raise ValueError("Excel sheets must contain the exact same header")
else:
continue

records.append(items)
workbook.close()

return records, header


async def tsv_to_s3(records: str, delimiter: str, s3_bucket: str, s3_key: str):
"""Write records to tsv/csv file then upload to s3.
Args:
records (str): list of lists with values as strings
delimiter (str): File delimiter
s3_bucket (str): destination s3 bucket
s3_key (str): destination s3 key
"""

with NamedTemporaryFile(mode='w') as tmp:
writer = csv.writer(tmp, delimiter=delimiter)
writer.writerows(records)
await upload_file(bucket=s3_bucket, filepath=tmp.name, s3_key=s3_key)


def detect_encoding(path: str) -> str:
"""Detect enrollment file encoding.
Args:
path (str): Enrollment file path
Returns:
str: Enrollment file encoding
"""

encoding = None
with open(path, "rb") as handle:
encoding_dict = chardet.detect(handle.read())
if 'encoding' in encoding_dict:
encoding = encoding_dict['encoding'].upper()

return encoding


def detect_delimiter(path: str, encoding: str) -> str:
"""Detect enrollment file delimiter.
Args:
path (str): Enrollment file path
encoding (str): File encoding
Returns:
str: Delimiter
"""

delimiter = ''
with open(path, newline='', encoding=encoding) as csvfile:
data = csvfile.read(4096)
count = -1
for item in [',', '\t', '|']:
char_count = data.count(item)
if char_count > count:
delimiter = item
count = char_count

return delimiter
3 changes: 2 additions & 1 deletion aioradio/pyodbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
'/usr/lib/libtdsodbc.so',
'/usr/local/lib/libtdsodbc.so',
'/usr/lib/x86_64-linux-gnu/odbc/libtdsodbc.so',
'/opt/microsoft/msodbcsql/lib64/opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.7.so.2.1'
'/opt/microsoft/msodbcsql/lib64/opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.7.so.2.1',
'/opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.10.so.1.1'
]


Expand Down
1 change: 1 addition & 0 deletions aioradio/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ aiojobs==1.1.0
backoff==2.2.1
boto3==1.24.59
botocore==1.27.59
cchardet==2.1.7
ddtrace==1.3.6
dominodatalab==1.2.0
fakeredis==1.9.4
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
long_description = fileobj.read()

setup(name='aioradio',
version='0.17.20',
version='0.17.21',
description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more',
long_description=long_description,
long_description_content_type="text/markdown",
Expand All @@ -26,6 +26,7 @@
'backoff>=2.1.2',
'botocore==1.27.59',
'boto3==1.24.59',
'cchardet>=2.1.7',
'ddtrace>=0.60.1',
'dominodatalab>=1.1.1',
'fakeredis>=1.7.1',
Expand Down

0 comments on commit de5f424

Please sign in to comment.