From de25b498826b2e2cb87895478b01accc29874368 Mon Sep 17 00:00:00 2001 From: maxim-lisovsky-gismart Date: Wed, 24 Apr 2024 14:00:52 +0200 Subject: [PATCH 1/2] add unload_data function --- README.md | 2 +- bi_utils/aws/db.py | 63 +++++++++++++++++++++++++++++++++++++--------- requirements.txt | 2 +- setup.py | 2 +- 4 files changed, 54 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index c89268c..e2cfba4 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Add `--upgrade` option to update existing package to a new version Specify package link in your `requirements.txt`: ```txt -git+https://github.com/gismart/bi-utils@0.16.3#egg=bi-utils-gismart +git+https://github.com/gismart/bi-utils@0.17.0#egg=bi-utils-gismart ``` ### Usage diff --git a/bi_utils/aws/db.py b/bi_utils/aws/db.py index a81224d..356cc67 100644 --- a/bi_utils/aws/db.py +++ b/bi_utils/aws/db.py @@ -95,19 +95,10 @@ def download_files( add_timestamp_dir: bool = True, add_s3_timestamp_dir: bool = True, ) -> Sequence[str]: - """Copy data from RedShift to S3 and download csv or parquet files up to 6.2 GB""" - max_chunk_size_opt = f"MAXFILESIZE {max_chunk_size_mb} MB" - if file_format.lower() == "csv": - unload_options = ["CSV", "HEADER", "GZIP", "PARALLEL ON", max_chunk_size_opt] - elif file_format.lower() == "parquet": + """Copy data from RedShift to S3 and download csv or parquet files up to `max_chunk_size_mb`""" + unload_options = _get_unload_options(file_format, delete_s3_before, max_chunk_size_mb) + if file_format.lower() == "parquet": separator = None - unload_options = ["PARQUET", "PARALLEL ON", max_chunk_size_opt] - else: - raise ValueError(f"{file_format} file format is not supported") - if delete_s3_before: - unload_options.append("CLEANPATH") - else: - unload_options.append("ALLOWOVERWRITE") if add_s3_timestamp_dir: bucket_dir = _add_timestamp_dir(bucket_dir, postfix="/", posix=True) elif not bucket_dir.endswith("/"): @@ -262,6 +253,32 @@ def download_data( return data +def unload_data( + query: str, + file_format: str = "csv", + *, + bucket: str = "gismart-analytics", + bucket_dir: str = "dwh/temp", + delete_s3_before: bool = False, + secret_id: str = connection.DEFAULT_SECRET_ID, + database: Optional[str] = None, + host: Optional[str] = None, + max_chunk_size_mb: int = 6000, + partition_by: list[str] | None = None, +) -> Sequence[str]: + """Unload data from RedShift to S3 into csv or parquet files up to `max_chunk_size_mb`""" + unload_options = _get_unload_options(file_format, delete_s3_before, max_chunk_size_mb, partition_by) + if not bucket_dir.endswith("/"): + bucket_dir += "/" + with connection.get_redshift(secret_id, database=database, host=host) as redshift_locopy: + s3_path = redshift_locopy._generate_unload_path(bucket, bucket_dir) + redshift_locopy.unload( + query=query, + s3path=s3_path, + unload_options=unload_options, + ) + + def read_files( file_path: Union[str, Sequence[str]], *, @@ -382,6 +399,28 @@ def _add_timestamp_dir(dir_path: str, postfix: str = "", posix: bool = False) -> return dir_path +def _get_unload_options( + file_format: str = "csv", + delete_s3_before: bool = False, + max_chunk_size_mb: int = 6000, + partition_by: list[str] | None = None, +) -> list[str]: + max_chunk_size_opt = f"MAXFILESIZE {max_chunk_size_mb} MB" + if file_format.lower() == "csv": + unload_options = ["CSV", "HEADER", "GZIP", "PARALLEL ON", max_chunk_size_opt] + elif file_format.lower() == "parquet": + unload_options = ["PARQUET", "PARALLEL ON", max_chunk_size_opt] + else: + raise ValueError(f"{file_format} file format is not supported") + if delete_s3_before: + unload_options.append("CLEANPATH") + else: + unload_options.append("ALLOWOVERWRITE") + if partition_by: + unload_options.append(f"PARTITION BY ({', '.join(partition_by)}) INCLUDE") + return unload_options + + def _read_chunks( filenames: Sequence[str], parse_bools: Iterable[str], diff --git a/requirements.txt b/requirements.txt index 674cc7f..a486dbe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,4 @@ psycopg2-binary<3.0.0,>=2.9.0 scikit-learn<2.0.0,>=0.23.1 SQLAlchemy<2.0.0,>=1.4.46 pyarrow>=15.0.0 -locopy==0.5.7 +locopy==0.5.8 diff --git a/setup.py b/setup.py index b6c1fcf..80f5e44 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ setuptools.setup( name="bi-utils-gismart", - version="0.16.3", + version="0.17.0", author="gismart", author_email="info@gismart.com", description="Utils for BI team", From 09e0c7026b4fe17a9db856e3bf34844a170fb866 Mon Sep 17 00:00:00 2001 From: maxim-lisovsky-gismart Date: Wed, 24 Apr 2024 17:18:35 +0200 Subject: [PATCH 2/2] fix line length --- bi_utils/aws/db.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bi_utils/aws/db.py b/bi_utils/aws/db.py index 356cc67..4c39815 100644 --- a/bi_utils/aws/db.py +++ b/bi_utils/aws/db.py @@ -267,7 +267,9 @@ def unload_data( partition_by: list[str] | None = None, ) -> Sequence[str]: """Unload data from RedShift to S3 into csv or parquet files up to `max_chunk_size_mb`""" - unload_options = _get_unload_options(file_format, delete_s3_before, max_chunk_size_mb, partition_by) + unload_options = _get_unload_options( + file_format, delete_s3_before, max_chunk_size_mb, partition_by + ) if not bucket_dir.endswith("/"): bucket_dir += "/" with connection.get_redshift(secret_id, database=database, host=host) as redshift_locopy: