Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add canales s3 functions #885

Merged
merged 16 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions parsons/aws/s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
import boto3
from botocore.client import ClientError

from parsons.utilities import files
import logging
import os
Expand Down Expand Up @@ -455,3 +456,22 @@ def transfer_bucket(
object_acl.put(ACL="public-read")

logger.info(f"Finished syncing {len(key_list)} keys")

def get_buckets_with_subname(self, bucket_subname):
"""
Grabs a type of bucket based on naming convention.

`Args:`
subname: str
This will most commonly be a 'vendor'

`Returns:`
list
list of buckets

"""

all_buckets = self.list_buckets()
buckets = [x for x in all_buckets if bucket_subname in x.split("-")]

return buckets
60 changes: 60 additions & 0 deletions parsons/databases/redshift/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,66 @@ def unload(

return self.query(statement)

def drop_and_unload(
self,
rs_table,
bucket,
key,
cascade=True,
manifest=True,
header=True,
delimiter="|",
compression="gzip",
add_quotes=True,
escape=True,
allow_overwrite=True,
parallel=True,
max_file_size="6.2 GB",
aws_region=None,
):
"""
Unload data to s3, and then drop Redshift table

Args:
rs_table: str
Redshift table.

bucket: str
S3 bucket

key: str
S3 key prefix ahead of table name

cascade: bool
whether to drop cascade

***unload params

Returns:
None
"""
query_end = "cascade" if cascade else ""

self.unload(
sql=f"select * from {rs_table}",
bucket=bucket,
key_prefix=f"{key}/{rs_table.replace('.','_')}/",
manifest=manifest,
header=header,
delimiter=delimiter,
compression=compression,
add_quotes=add_quotes,
escape=escape,
allow_overwrite=allow_overwrite,
parallel=parallel,
max_file_size=max_file_size,
aws_region=aws_region,
)

self.query(f"drop table if exists {rs_table} {query_end}")

return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't technically need to return None explicitly but it doesn't hurt anything


def generate_manifest(
self,
buckets,
Expand Down
22 changes: 22 additions & 0 deletions test/test_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,28 @@ def test_unload(self):
# Check that files are there
self.assertTrue(self.s3.key_exists(self.temp_s3_bucket, "unload_test"))

def test_drop_and_unload(self):

rs_table_test = f"{self.temp_schema}.test_copy"
# Copy a table to Redshift
self.rs.copy(self.tbl, rs_table_test, if_exists="drop")

key = "unload_test"

# Unload a table to S3
self.rs.drop_and_unload(
rs_table=rs_table_test,
bucket=self.temp_s3_bucket,
key=key,
)

key_prefix = f"{key}/{self.tbl.replace('.','_')}/"

# Check that files are there
self.assertTrue(self.s3.key_exists(self.temp_s3_bucket, key_prefix))

self.assertFalse(self.rs.table_exists(rs_table_test))

def test_to_from_redshift(self):

# Test the parsons table methods
Expand Down
16 changes: 16 additions & 0 deletions test/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,16 @@ def setUp(self):
self.tbl = Table([{"first": "Bob", "last": "Smith"}])
csv_path = self.tbl.to_csv()

self.test_incoming_prefix = "incoming"
self.test_processing_prefix = "processing"
self.test_dest_prefix = "archive"

self.test_key_2 = "test2.csv"
self.tbl_2 = Table([{"first": "Jack", "last": "Smith"}])
csv_path_2 = self.tbl_2.to_csv()

self.test_bucket_subname = self.test_bucket.split("-")[0]

# Sometimes it runs into issues putting the file
retry = 1

Expand Down Expand Up @@ -183,3 +189,13 @@ def test_transfer_bucket(self):
result_tbl_2 = Table.from_csv(path_2)
assert_matching_tables(self.tbl_2, result_tbl_2)
self.assertFalse(self.s3.key_exists(self.test_bucket, self.test_key_2))

def test_get_buckets_with_subname(self):

buckets_with_subname_true = self.s3.get_buckets_type(self.test_bucket_subname)
self.assertTrue(self.test_bucket in buckets_with_subname_true)

buckets_with_subname_false = self.s3.get_buckets_type(
"bucketsubnamedoesnotexist"
)
self.assertFalse(self.test_bucket in buckets_with_subname_false)