From a2d726567c2127de250546e9c9a53add6767431b Mon Sep 17 00:00:00 2001 From: "tim.reichard" Date: Fri, 28 Oct 2022 16:36:08 -0500 Subject: [PATCH] Update xlsx_to_tsv function to use s3 --- HISTORY.rst | 5 ++++ aioradio/file_ingestion.py | 60 +++++++++++++++++++++++--------------- setup.py | 2 +- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 792892d..0be9e2f 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,11 @@ History ======= +v0.17.19 (2022-10-28) + +* Update xlsx_to_tsv function to use s3 instead of local directories for origin/destination files. + + v0.17.18 (2022-10-21) * Add a function in file_ingestion to convert an xlsx file to csv/tsv. diff --git a/aioradio/file_ingestion.py b/aioradio/file_ingestion.py index 52014e2..73e628a 100644 --- a/aioradio/file_ingestion.py +++ b/aioradio/file_ingestion.py @@ -28,6 +28,7 @@ from dataclasses import field as dc_field from datetime import datetime, timedelta, timezone, tzinfo from pathlib import Path +from tempfile import NamedTemporaryFile from types import coroutine from typing import Any, Dict, List @@ -38,6 +39,7 @@ from smb.smb_structs import OperationFailure from smb.SMBConnection import SMBConnection +from aioradio.aws.s3 import download_file, upload_file from aioradio.aws.secrets import get_secret from aioradio.psycopg2 import establish_psycopg2_connection @@ -1620,12 +1622,20 @@ async def get_ftp_file_attributes(conn: SMBConnection, service_name: str, ftp_pa return conn.getAttributes(service_name=service_name, path=ftp_path) -def xlsx_to_tsv(source: str, destination: str, delimiter: str='\t') -> str | None: +async def xlsx_to_tsv( + s3_source_bucket: str, + s3_source_key: str, + s3_destination_bucket: str, + s3_destination_key: str, + delimiter: str='\t' +) -> str | None: """Convert and xlsx file to csv/tsv file. Args: - source (str): XLSX filepath to convert - destination (str): Destination CSV/TSV filepath + s3_source_bucket (str): source xlsx file s3 bucket + s3_source_key (str): source xlsx file s3 key + s3_destination_bucket (str): destination xlsx file s3 bucket + s3_destination_key (str): destination xlsx file s3 key delimiter (str, optional): Delimiter. Defaults to '\t'. Returns: @@ -1635,27 +1645,31 @@ def xlsx_to_tsv(source: str, destination: str, delimiter: str='\t') -> str | Non try: records = [] header = None - workbook = load_workbook(source, read_only=True) - for sheet in workbook: - sheet.calculate_dimension(force=True) - - for idx, row in enumerate(sheet.values): - items = [str(value) if value is not None else "" for value in row] - - if idx == 0: - if header is None: - header = items - elif header != items: - raise ValueError("Excel sheets must contain the exact same header") - else: - continue - - records.append(items) - workbook.close() - - with open(destination, 'w', encoding='utf-8') as csvfile: - writer = csv.writer(csvfile, delimiter=delimiter) + + with NamedTemporaryFile(suffix='.xlsx') as tmp: + await download_file(bucket=s3_source_bucket, filepath=tmp.name, s3_key=s3_source_key) + workbook = load_workbook(tmp.name, read_only=True) + for sheet in workbook: + sheet.calculate_dimension(force=True) + + for idx, row in enumerate(sheet.values): + items = [str(value) if value is not None else "" for value in row] + + if idx == 0: + if header is None: + header = items + elif header != items: + raise ValueError("Excel sheets must contain the exact same header") + else: + continue + + records.append(items) + workbook.close() + + with NamedTemporaryFile(mode='w') as tmp: + writer = csv.writer(tmp, delimiter=delimiter) writer.writerows(records) + await upload_file(bucket=s3_destination_bucket, filepath=tmp.name, s3_key=s3_destination_key) except Exception as err: print(err) diff --git a/setup.py b/setup.py index f7b9057..2e54fe0 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.17.18', + version='0.17.19', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown",