Skip to content

Commit

Permalink
Merge pull request #47 from gismart/unload
Browse files Browse the repository at this point in the history
Add unload_data function
  • Loading branch information
maxim-lisovsky-gismart authored Apr 24, 2024
2 parents 4801cee + 09e0c70 commit c51bb95
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 15 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Add `--upgrade` option to update existing package to a new version
Specify package link in your `requirements.txt`:

```txt
git+https://github.com/gismart/bi-utils@0.16.3#egg=bi-utils-gismart
git+https://github.com/gismart/bi-utils@0.17.0#egg=bi-utils-gismart
```

### Usage
Expand Down
65 changes: 53 additions & 12 deletions bi_utils/aws/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,10 @@ def download_files(
add_timestamp_dir: bool = True,
add_s3_timestamp_dir: bool = True,
) -> Sequence[str]:
"""Copy data from RedShift to S3 and download csv or parquet files up to 6.2 GB"""
max_chunk_size_opt = f"MAXFILESIZE {max_chunk_size_mb} MB"
if file_format.lower() == "csv":
unload_options = ["CSV", "HEADER", "GZIP", "PARALLEL ON", max_chunk_size_opt]
elif file_format.lower() == "parquet":
"""Copy data from RedShift to S3 and download csv or parquet files up to `max_chunk_size_mb`"""
unload_options = _get_unload_options(file_format, delete_s3_before, max_chunk_size_mb)
if file_format.lower() == "parquet":
separator = None
unload_options = ["PARQUET", "PARALLEL ON", max_chunk_size_opt]
else:
raise ValueError(f"{file_format} file format is not supported")
if delete_s3_before:
unload_options.append("CLEANPATH")
else:
unload_options.append("ALLOWOVERWRITE")
if add_s3_timestamp_dir:
bucket_dir = _add_timestamp_dir(bucket_dir, postfix="/", posix=True)
elif not bucket_dir.endswith("/"):
Expand Down Expand Up @@ -262,6 +253,34 @@ def download_data(
return data


def unload_data(
query: str,
file_format: str = "csv",
*,
bucket: str = "gismart-analytics",
bucket_dir: str = "dwh/temp",
delete_s3_before: bool = False,
secret_id: str = connection.DEFAULT_SECRET_ID,
database: Optional[str] = None,
host: Optional[str] = None,
max_chunk_size_mb: int = 6000,
partition_by: list[str] | None = None,
) -> Sequence[str]:
"""Unload data from RedShift to S3 into csv or parquet files up to `max_chunk_size_mb`"""
unload_options = _get_unload_options(
file_format, delete_s3_before, max_chunk_size_mb, partition_by
)
if not bucket_dir.endswith("/"):
bucket_dir += "/"
with connection.get_redshift(secret_id, database=database, host=host) as redshift_locopy:
s3_path = redshift_locopy._generate_unload_path(bucket, bucket_dir)
redshift_locopy.unload(
query=query,
s3path=s3_path,
unload_options=unload_options,
)


def read_files(
file_path: Union[str, Sequence[str]],
*,
Expand Down Expand Up @@ -382,6 +401,28 @@ def _add_timestamp_dir(dir_path: str, postfix: str = "", posix: bool = False) ->
return dir_path


def _get_unload_options(
file_format: str = "csv",
delete_s3_before: bool = False,
max_chunk_size_mb: int = 6000,
partition_by: list[str] | None = None,
) -> list[str]:
max_chunk_size_opt = f"MAXFILESIZE {max_chunk_size_mb} MB"
if file_format.lower() == "csv":
unload_options = ["CSV", "HEADER", "GZIP", "PARALLEL ON", max_chunk_size_opt]
elif file_format.lower() == "parquet":
unload_options = ["PARQUET", "PARALLEL ON", max_chunk_size_opt]
else:
raise ValueError(f"{file_format} file format is not supported")
if delete_s3_before:
unload_options.append("CLEANPATH")
else:
unload_options.append("ALLOWOVERWRITE")
if partition_by:
unload_options.append(f"PARTITION BY ({', '.join(partition_by)}) INCLUDE")
return unload_options


def _read_chunks(
filenames: Sequence[str],
parse_bools: Iterable[str],
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ psycopg2-binary<3.0.0,>=2.9.0
scikit-learn<2.0.0,>=0.23.1
SQLAlchemy<2.0.0,>=1.4.46
pyarrow>=15.0.0
locopy==0.5.7
locopy==0.5.8
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

setuptools.setup(
name="bi-utils-gismart",
version="0.16.3",
version="0.17.0",
author="gismart",
author_email="[email protected]",
description="Utils for BI team",
Expand Down

0 comments on commit c51bb95

Please sign in to comment.